From 413fae5cfcda2e179c4c17dd5a73fd2baf845c30 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sat, 31 May 2025 21:50:51 -0400 Subject: [PATCH 01/36] feat: send server error to client --- server/src/signaling/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 4282535..0040bb2 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -134,6 +134,7 @@ async fn handle_connection( }, Err(e) => { error!("Error: {e}"); + write.send(PacketS2C::Error { error: e.to_string() }).await?; } } } From 384d374067c3ab4ef30ea999e82fad63ed308530 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sat, 31 May 2025 21:51:18 -0400 Subject: [PATCH 02/36] fix: more explicit feedback --- server/src/signaling/packets.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 43d03ae..7e77882 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -170,10 +170,12 @@ pub enum ServerError { NotConnected, #[error("Media type already has an existing track!")] MediaTypeSatisfied, - #[error("Request cannot be parsed.")] - NotDeserializable, + #[error("Bad Request. Reason: {reason}")] + BadRequest { reason: String }, #[error("Request type is unknown.")] UnknownRequest, + #[error("Received message is not a text.")] + UnproccesableEntity, } impl std::fmt::Display for MediaType { @@ -194,13 +196,14 @@ impl PacketC2S { match serde_json::from_str(&text) { Ok(packet) => Ok(packet), Err(e) => { - error!("Error: {e}"); error!("Tried to parse packet: {text}"); - Err(ServerError::UnknownRequest) + let reason = e.to_string(); + error!("Error: {reason}"); + Err(ServerError::BadRequest { reason }) } } } else { - Err(ServerError::NotDeserializable) + Err(ServerError::UnproccesableEntity) } } } From 44a67a84227d4d4791f56a23e48ed193b40012e1 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sat, 31 May 2025 22:21:15 -0400 Subject: [PATCH 03/36] feat: Server error as packet --- server/src/signaling/packets.rs | 23 ++++++++++++++++------- server/src/signaling/server.rs | 11 ++++++++--- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 7e77882..16ccd51 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -2,7 +2,10 @@ use thiserror::Error; use tokio_tungstenite::tungstenite::Message; use volcano_sfu::rtc::{peer::JoinConfig, room::RoomInfo}; -use webrtc::{ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::session_description::RTCSessionDescription}; +use webrtc::{ + ice_transport::ice_candidate::RTCIceCandidateInit, + peer_connection::sdp::session_description::RTCSessionDescription, +}; /// Available types of media tracks #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] @@ -48,9 +51,7 @@ pub enum Negotiation { #[serde(tag = "type")] pub enum PacketC2S { /// Answer (from client subscriber) - Answer { - description: RTCSessionDescription, - }, + Answer { description: RTCSessionDescription }, /// Offer (from negotiation) Offer { id: u32, @@ -101,7 +102,7 @@ pub enum PacketS2C { /// Accept authentication Accept { id: u32, - available_rooms: Vec + available_rooms: Vec, }, /// Answer (for client publisher) Answer { @@ -152,7 +153,13 @@ pub enum PacketS2C { user_id: String, }, /// Disconnection error - Error { error: String }, + Error { + error: String, + }, + /// Custom server error + ServerError { + error: ServerError, + }, } /// An error occurred on the server @@ -166,6 +173,8 @@ pub enum ServerError { FailedToAuthenticate, #[error("Already connected to a room!")] AlreadyConnected, + #[error("Not authenticated in this session.")] + NotAuthenticated, #[error("Not connected to any room!")] NotConnected, #[error("Media type already has an existing track!")] @@ -252,4 +261,4 @@ pub struct RemoteMedia { pub audio: bool, #[serde(skip_serializing_if = "Option::is_none")] pub layers: Option>, -} \ No newline at end of file +} diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 0040bb2..60a4d4b 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -130,11 +130,16 @@ async fn handle_connection( } break; } - _ => {} + _ => { + write + .send(PacketS2C::ServerError { + error: ServerError::NotAuthenticated, + }) + .await?; + } }, Err(e) => { - error!("Error: {e}"); - write.send(PacketS2C::Error { error: e.to_string() }).await?; + write.send(PacketS2C::ServerError { error: e }).await?; } } } From aad1435dcc612ee26e97e277ac009f41c1cba445 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sat, 31 May 2025 22:45:42 -0400 Subject: [PATCH 04/36] log: set timestamp to env logger --- server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main.rs b/server/src/main.rs index c86771f..74ffdb9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,7 +7,7 @@ pub mod signaling; #[tokio::main] async fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); + pretty_env_logger::init_timed(); signaling::server::launch("0.0.0.0:4000", Box::new(move |token| { Box::pin(async move { use signaling::server::{UserCapabilities, UserInformation}; From 4490c400e6e7d73ef5099bcda04d9d4f2dcc82e0 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 01:10:16 -0400 Subject: [PATCH 05/36] fix: room send event function is public log: Incoming messages --- server/src/signaling/client.rs | 11 +++++++---- volcano-sfu/src/rtc/peer.rs | 2 +- volcano-sfu/src/rtc/room.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index b24ed47..89f3641 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -61,9 +61,14 @@ impl Client { let ws_worker = async { // Read incoming messages while let Some(msg) = read.try_next().await? { - debug!("Websocket message received."); + debug!("[Incoming] Message received."); match PacketC2S::from(msg) { - Ok(packet) => self.handle_message(packet, &write).await?, + Ok(packet) => { + info!("[Incoming] C->S: {:?}", packet); + let result = self.handle_message(packet, &write).await?; + debug!("[Incoming] Done!"); + result + }, Err(e) => { match e { ServerError::UnknownRequest => write @@ -125,8 +130,6 @@ impl Client { /// Handle incoming packet async fn handle_message(&mut self, packet: PacketC2S, write: &Sender) -> Result<()> { - info!("C->S: {:?}", packet); - let peer = self.peer.clone(); match packet { PacketC2S::Answer { description } => peer.set_remote_description(description).await, diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index e2ee8a9..7d769fb 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -309,7 +309,7 @@ impl Peer { pub async fn set_remote_description(&self, sdp: RTCSessionDescription) -> Result<()> { if let Some(subscriber) = &*self.subscriber.lock().await { - info!("PeerLocal {} sets remote description", self.id); + info!("[Peer {}] sets remote description", self.id); subscriber.set_remote_description(sdp).await?; self.remote_answer_pending.store(false, Ordering::Relaxed); diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 2939100..28a5b0b 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -491,7 +491,7 @@ impl Room { // TODO: stop the RTP sender thread and drop } - async fn send_room_event(&self, event: RoomEvent) { + pub async fn send_room_event(&self, event: RoomEvent) { if let Ok(payload) = serde_json::to_string(&event) { info!("[Room {}] Sending room event: {}", self.id, payload); for peer in self.peers.iter() { From 611638e24f3bd36e69e969b2618fff6481043b4c Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 01:11:01 -0400 Subject: [PATCH 06/36] fix: websocket connection does not deadlock on remote description --- volcano-sfu/src/rtc/peer/subscriber.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/volcano-sfu/src/rtc/peer/subscriber.rs b/volcano-sfu/src/rtc/peer/subscriber.rs index f3ba769..e03db73 100644 --- a/volcano-sfu/src/rtc/peer/subscriber.rs +++ b/volcano-sfu/src/rtc/peer/subscriber.rs @@ -268,14 +268,16 @@ impl Subscriber { pub async fn set_remote_description(&self, sdp: RTCSessionDescription) -> Result<()> { self.pc.set_remote_description(sdp).await?; - let candidates = self.candidates.lock().await; + let mut candidates = self.candidates.lock().await; + + info!("[Subscriber {}] ICE candidates ({})", self.id, candidates.len()); for candidate in &*candidates { - self.pc.add_ice_candidate(candidate.clone()).await?; + if let Err(err) = self.pc.add_ice_candidate(candidate.clone()).await { + warn!("add_ice_candidate error: {}", err); + }; } - - self.candidates.lock().await.clear(); - - info!("Answer accepted"); + + candidates.clear(); Ok(()) } } From f831a77ad763cffe27329193f2a0a228e50dc2c7 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 01:24:01 -0400 Subject: [PATCH 07/36] fix: send remove track though datachannel --- volcano-sfu/src/rtc/room.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 28a5b0b..7f6d9b1 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -419,7 +419,8 @@ impl Room { self.close_track(id); } - self.publish(RoomEvent::RemoveTrack { + //self.publish(RoomEvent::RemoveTrack { + self.send_room_event(RoomEvent::RemoveTrack { room: self.id.clone(), removed_tracks, }) From bb792afec8e4850e1f4532637c7fa0b431edbaea Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 01:39:49 -0400 Subject: [PATCH 08/36] chore: bump to 0.3.5 --- Cargo.lock | 2 +- volcano-sfu/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d331488..dd4c5c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1943,7 +1943,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.4" +version = "0.3.5" dependencies = [ "anyhow", "async-trait", diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index 5ae6200..add9f2f 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.4" +version = "0.3.5" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From b1021ddca94b41478befd3e63f21dfca9651904e Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 18:21:35 -0400 Subject: [PATCH 09/36] fix: ignore build outputs & use distroless image --- .dockerignore | 13 +++++++++++++ Dockerfile | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..571cd0f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +# GIT +.git +.gitignore + +# Build +/target + +# IDE +.vscode + +# Environment +.env +.env.example \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index c3e091c..db53377 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ RUN cargo install --path server RUN rm */src/*.rs target/release/deps/volcano* # Bundle -FROM debian:bookworm +FROM gcr.io/distroless/cc-debian12 COPY --from=build /usr/local/cargo/bin/server ./volcano-server COPY config.toml ./config.toml From 19b3c4bd515163b6109e373eff1f712a86d04a83 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 1 Jun 2025 18:31:15 -0400 Subject: [PATCH 10/36] fix: dont try to send a final message --- server/src/signaling/server.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 60a4d4b..19eb89f 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -86,12 +86,7 @@ async fn accept_connection(stream: TcpStream, auth: Arc, w: Arc Date: Sun, 1 Jun 2025 19:03:58 -0400 Subject: [PATCH 11/36] fix: better representation for room events --- volcano-sfu/src/rtc/room.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 7f6d9b1..2133c98 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -23,7 +23,8 @@ use crate::track::{audio_observer::AudioObserver, router::LocalRouter}; use super::peer::Peer; /// Room event which indicates something happened to a peer -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", content = "data")] pub enum RoomEvent { Create(String), Close(String), From 60aa62eee4de609c58f6041defd1c973b500175d Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 2 Jun 2025 21:45:44 -0400 Subject: [PATCH 12/36] fix: worker should not end by an internal error --- server/src/signaling/client.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index 89f3641..830cbe5 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -65,9 +65,11 @@ impl Client { match PacketC2S::from(msg) { Ok(packet) => { info!("[Incoming] C->S: {:?}", packet); - let result = self.handle_message(packet, &write).await?; - debug!("[Incoming] Done!"); - result + let result = self.handle_message(packet, &write).await; + match result { + Ok(_) => debug!("[Incoming] Done!"), + Err(e) => error!("[Incoming] Error: {e}"), + } }, Err(e) => { match e { From 3e8f3a1dec127c9b2746fd41fb4a51c357ca53f8 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 2 Jun 2025 21:58:09 -0400 Subject: [PATCH 13/36] chore: use reference to parse and reuse msg (less cloning) --- server/src/signaling/client.rs | 7 ++++++- server/src/signaling/packets.rs | 4 ++-- server/src/signaling/server.rs | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index 830cbe5..b57b207 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -62,7 +62,8 @@ impl Client { // Read incoming messages while let Some(msg) = read.try_next().await? { debug!("[Incoming] Message received."); - match PacketC2S::from(msg) { + + match PacketC2S::from(&msg) { Ok(packet) => { info!("[Incoming] C->S: {:?}", packet); let result = self.handle_message(packet, &write).await; @@ -78,6 +79,10 @@ impl Client { error: e.to_string(), }) .await?, + ServerError::UnproccesableEntity => { + debug!("[Incoming] Message is not text."); + debug!("msg -> {}", msg.into_text().unwrap_or_else(|e| e.to_string())) + } _ => { debug!("Websocket message is not a packet."); error!("Error message not handled: {e}"); diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 16ccd51..f3b9b4d 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -200,9 +200,9 @@ impl std::fmt::Display for MediaType { impl PacketC2S { /// Create a packet from incoming Message - pub fn from(message: Message) -> Result { + pub fn from(message: &Message) -> Result { if let Message::Text(text) = message { - match serde_json::from_str(&text) { + match serde_json::from_str(text) { Ok(packet) => Ok(packet), Err(e) => { error!("Tried to parse packet: {text}"); diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 19eb89f..cc1748a 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -99,7 +99,7 @@ async fn handle_connection( // Wait until valid packet is sent let mut client: Option = None; while let Some(msg) = read.next().await { - match PacketC2S::from(msg?) { + match PacketC2S::from(&msg?) { Ok(packet) => match packet { PacketC2S::Connect { id, From 496b4557e53888f83f6e38a1ecb9e962d1a3e1d1 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 2 Jun 2025 23:20:44 -0400 Subject: [PATCH 14/36] fix: clean up peers from room --- Cargo.lock | 4 +- server/Cargo.toml | 2 +- server/src/signaling/client.rs | 105 +++++++++++++++++++++----------- server/src/signaling/packets.rs | 2 + volcano-sfu/Cargo.toml | 2 +- volcano-sfu/src/rtc/peer.rs | 7 +-- volcano-sfu/src/rtc/room.rs | 4 +- 7 files changed, 78 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd4c5c3..cd343a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1481,7 +1481,7 @@ dependencies = [ [[package]] name = "server" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", @@ -1943,7 +1943,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.5" +version = "0.3.6" dependencies = [ "anyhow", "async-trait", diff --git a/server/Cargo.toml b/server/Cargo.toml index 5aadeea..d94d9de 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index b57b207..6144b7a 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use anyhow::Result; use futures::{ - future::{select, Either}, pin_mut, FutureExt, TryStreamExt + future::{select, Either}, + pin_mut, FutureExt, TryStreamExt, }; use postage::stream::Stream; use volcano_sfu::rtc::{ @@ -10,7 +11,12 @@ use volcano_sfu::rtc::{ peer::{JoinConfig, Peer}, room::{Room, RoomSignal}, }; -use webrtc::{ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::session_description::RTCSessionDescription}; +use webrtc::{ + ice_transport::{ + ice_candidate::RTCIceCandidateInit, ice_connection_state::RTCIceConnectionState, + }, + peer_connection::sdp::session_description::RTCSessionDescription, +}; use super::{ packets::{PacketC2S, PacketS2C, ServerError}, @@ -71,24 +77,27 @@ impl Client { Ok(_) => debug!("[Incoming] Done!"), Err(e) => error!("[Incoming] Error: {e}"), } - }, - Err(e) => { - match e { - ServerError::UnknownRequest => write - .send(PacketS2C::Error { - error: e.to_string(), - }) - .await?, - ServerError::UnproccesableEntity => { - debug!("[Incoming] Message is not text."); - debug!("msg -> {}", msg.into_text().unwrap_or_else(|e| e.to_string())) - } - _ => { - debug!("Websocket message is not a packet."); - error!("Error message not handled: {e}"); - } - } } + Err(e) => match e { + ServerError::UnknownRequest => { + write + .send(PacketS2C::Error { + error: e.to_string(), + }) + .await? + } + ServerError::UnproccesableEntity => { + debug!("[Incoming] Message is not text."); + debug!( + "msg -> {}", + msg.into_text().unwrap_or_else(|e| e.to_string()) + ) + } + _ => { + debug!("Websocket message is not a packet."); + error!("Error message not handled: {e}"); + } + }, } } @@ -97,7 +106,7 @@ impl Client { Ok(()) } .fuse(); - + let room_worker = async { debug!("Created room listener"); let mut listener = signal.listener(); @@ -110,7 +119,8 @@ impl Client { // TODO: maybe throw an error for listener being closed? info!("Closing room listener"); anyhow::Ok(()) - }.fuse(); + } + .fuse(); // Pin futures on the stack pin_mut!(ws_worker, room_worker); @@ -124,15 +134,17 @@ impl Client { /// Clean up after ourselves by disconnecting from the room, /// closing the peer connection and removing tracks. pub async fn lifecycle_clean_up(&mut self) -> Result<()> { - info!("User {} disconnected", self.user.id); + let user_id = &self.user.id; + info!("User {} disconnected", user_id); if let Some(room) = &self.room { - room.unsubscribe_signal(&self.user.id).await; - room.remove_user(&self.user.id).await; + room.unsubscribe_signal(user_id).await; + room.remove_peer(user_id).await; + room.remove_user(user_id).await; if room.is_empty() { - room.close().await; + debug!("Room {} is empty. Should clean up?", room.id); } } - self.peer.clean_up().await + Ok(()) } /// Handle incoming packet @@ -142,7 +154,6 @@ impl Client { PacketC2S::Answer { description } => peer.set_remote_description(description).await, PacketC2S::Connect { .. } => Err(ServerError::AlreadyConnected.into()), PacketC2S::Continue { .. } => { - // TODO: Add Continue event Ok(()) } @@ -160,7 +171,7 @@ impl Client { match &self.room { Some(room) => { // Close all peers - self.peer.clean_up().await?; + room.remove_peer(&self.user.id).await; // Remove user room.remove_user(&self.user.id).await; if room.is_empty() { @@ -173,12 +184,10 @@ impl Client { } } PacketC2S::Remove { removed_tracks: _ } => Ok(()), - PacketC2S::Offer {id, description } => { + PacketC2S::Offer { id, description } => { Self::handle_offer(peer, write.clone(), description, id).await } - PacketC2S::Trickle { candidate, target } => { - peer.trickle(candidate, target).await - } + PacketC2S::Trickle { candidate, target } => peer.trickle(candidate, target).await, } } @@ -190,12 +199,12 @@ impl Client { cfg: JoinConfig, id: u32, ) -> Result<()> { - // Signaling was experimental. // room.subscribe_signal(self.signal.clone()).await; let peer = self.peer.clone(); let write_out_1 = write.clone(); let write_out_2 = write.clone(); + let write_out_3 = write.clone(); peer.on_offer(Box::new(move |offer| { let write_in = write_out_1.clone(); Box::pin(async move { @@ -221,12 +230,30 @@ impl Client { )) .await; let peer_id = peer.id().clone(); - peer.register_on_ice_connection_state_change(Box::new( move |state| { + peer.register_on_ice_connection_state_change(Box::new(move |state| { let peer_id_in = peer_id.clone(); + let write_in = write_out_3.clone(); Box::pin(async move { - debug!("[Publisher {}] ICE connection state changed to: {}", peer_id_in, state); + debug!( + "[Publisher {}] ICE connection state changed to: {}", + peer_id_in, state + ); + match state { + RTCIceConnectionState::Failed => { + if let Err(err) = write_in + .send(PacketS2C::ServerError { + error: ServerError::PeerConnectionFailed, + }) + .await + { + error!("Write failed: {err}"); + }; + } + _ => {} + } }) - })).await; + })) + .await; if let Err(err) = peer.join(room.id.clone(), cfg).await { error!("join error: {}", err); @@ -245,7 +272,11 @@ impl Client { } Err(err) => { // Client should know error - write.send(PacketS2C::Error { error: err.to_string() }).await?; + write + .send(PacketS2C::Error { + error: err.to_string(), + }) + .await?; error!("answer error: {}", err); } }; diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index f3b9b4d..2e44d65 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -179,6 +179,8 @@ pub enum ServerError { NotConnected, #[error("Media type already has an existing track!")] MediaTypeSatisfied, + #[error("Peer connection failed!")] + PeerConnectionFailed, #[error("Bad Request. Reason: {reason}")] BadRequest { reason: String }, #[error("Request type is unknown.")] diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index add9f2f..7beef38 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.5" +version = "0.3.6" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index 7d769fb..41ed726 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -119,7 +119,7 @@ impl Peer { } } /// Clean up any open connections - pub async fn clean_up(&self) -> Result<()> { + pub async fn clean_up(&self) { // Takes out mutex peers let subscriber = self.subscriber.lock().await.take(); let publisher = self.publisher.lock().await.take(); @@ -130,11 +130,6 @@ impl Peer { if let Some(p) = publisher { p.close().await; } - - Ok(()) - - // TODO: find out if tracks are removed too - //self.pc.close().await.map_err(Into::into) } diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 2133c98..bcfc5e3 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -323,7 +323,9 @@ impl Room { } pub async fn remove_peer(&self, peer_id: &str) -> usize { - self.peers.remove(peer_id); + if let Some((_, peer)) = self.peers.remove(peer_id) { + peer.clean_up().await; + }; let peer_count = self.peers.len(); peer_count From 7e9150e0b3bb3bcb96052ede8e5f4a245747ba78 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Tue, 3 Jun 2025 15:40:20 -0400 Subject: [PATCH 15/36] fix: insert user with no tracks --- server/src/signaling/client.rs | 6 ++++ server/src/signaling/packets.rs | 3 ++ volcano-sfu/src/rtc/peer.rs | 3 ++ volcano-sfu/src/rtc/room.rs | 63 +++++++++++++++++++-------------- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index 6144b7a..788d047 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -280,6 +280,12 @@ impl Client { error!("answer error: {}", err); } }; + + // Send room info + let room_info = room.get_room_info(); + if let Err(err) = write.send(PacketS2C::RoomInfo { room: room_info }).await { + error!("send room info error: {}", err); + }; Ok(()) } diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 2e44d65..4564aca 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -116,6 +116,9 @@ pub enum PacketS2C { /// IDs of tracks that are no longer being produced removed_tracks: Vec, }, + RoomInfo { + room: RoomInfo, + }, /// Offer (for client subscriber) Offer { description: RTCSessionDescription, diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index 41ed726..3dd580b 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -277,6 +277,9 @@ impl Peer { "[Peer {}] Adds to room {}", id, room_id ); + + // Send user join event with no tracks + room.join_user(id.to_owned(), Vec::new()).await; if !cfg.no_subscribe { room.subscribe(self.clone()).await; diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index bcfc5e3..924e46c 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -1,7 +1,7 @@ -use std::sync::{ +use std::{collections::BTreeMap, fmt::Debug, sync::{ atomic::{AtomicBool, Ordering}, Arc, -}; +}}; use dashmap::{DashMap, DashSet}; use postage::{ @@ -18,6 +18,8 @@ use webrtc::{ }, track::track_local::{TrackLocal, track_local_static_rtp::TrackLocalStaticRTP}, }; +use serde::Serialize; + use crate::track::{audio_observer::AudioObserver, router::LocalRouter}; use super::peer::Peer; @@ -33,6 +35,7 @@ pub enum RoomEvent { room_id: String, }, DataChannelMessage(Vec), + RoomInfo(RoomInfo), RemoveTrack { removed_tracks: Vec, room: String, @@ -48,6 +51,12 @@ pub enum RoomEvent { }, } +#[derive(Clone, Debug, Serialize)] +pub struct RoomInfo { + pub id: String, + pub users: BTreeMap>, +} + /// Room consisting of clients which can communicate with one another #[derive(Debug)] pub struct Room { @@ -60,17 +69,12 @@ pub struct Room { /// Signalers for this room signalers: Arc>>>, sender: Sender, - participants: DashSet, + //participants: DashSet, pub audio_observer: Arc>, user_tracks: DashMap>, peers: DashMap>, tracks: DashMap>, } -#[derive(Debug, Serialize)] -pub struct RoomInfo { - id: String, - participants: Vec, -} lazy_static! { static ref ROOMS: DashMap> = DashMap::new(); @@ -89,7 +93,6 @@ impl Room { signalers: Default::default(), labels: Default::default(), peers: Default::default(), - participants: Default::default(), audio_observer: Arc::new(Mutex::new(AudioObserver::new(100, 80, 50))), user_tracks: Default::default(), tracks: Default::default(), @@ -180,19 +183,21 @@ impl Room { pub(crate) async fn add_api_channel(self: &Arc, id: &str) { if let Some(peer) = self.get_peer(id).await { let room_out = self.clone(); - let user_id_out = id.to_owned(); + //let user_id_out = id.to_owned(); if peer.subscriber().await.is_none() { error!("add_api_channel No subscriber available"); return; } let subscriber = peer.subscriber().await.unwrap(); - let user_tracks = self.user_tracks.get(id).map(|tracks| tracks.clone()); + //let user_tracks = self.user_tracks.get(id).map(|tracks| tracks.clone()); subscriber.api_channel().on_open(Box::new( move || { let room_in = room_out.clone(); - let user_id_in = user_id_out.clone(); - let tracks_in = user_tracks.unwrap_or_default(); + //let user_id_in = user_id_out.clone(); + //let tracks_in = user_tracks.unwrap_or_default(); Box::pin(async move { - room_in.join_user(user_id_in, tracks_in).await + //room_in.join_user(user_id_in, tracks_in).await; + info!("[Room {}] API channel opened", room_in.id); + warn!("DataChannelOpen event not implemented"); }) })); let room_id = self.id.clone(); @@ -236,10 +241,7 @@ impl Room { let mut available_rooms = Vec::new(); for id in ids { if let Some(room) = ROOMS.get(id) { - available_rooms.push(RoomInfo { - id: id.clone(), - participants: room.participants.clone().into_iter().collect(), - }) + available_rooms.push(room.get_room_info()) } } available_rooms @@ -351,12 +353,20 @@ impl Room { user_id: id.clone(), user_tracks: tracks.clone(), }; - self.send_room_event(ev).await; + self.send_message(ev).await; - // Add tracks to map + // Insert tracks self.user_tracks.insert(id, tracks); } + pub fn get_room_info(&self,) -> RoomInfo { + let user_tracks = self.user_tracks.clone(); + let mut users = BTreeMap::new(); + // Serialize user tracks + user_tracks.into_iter().for_each(|(key, value)| {users.insert(key, value);}); + return RoomInfo { id: self.id.clone(), users }; + } + pub async fn subscribe(self: &Arc, peer: Arc) { info!("Subscribing a new peer"); @@ -423,7 +433,7 @@ impl Room { } //self.publish(RoomEvent::RemoveTrack { - self.send_room_event(RoomEvent::RemoveTrack { + self.send_message(RoomEvent::RemoveTrack { room: self.id.clone(), removed_tracks, }) @@ -431,7 +441,7 @@ impl Room { } // Let everyone know we left - self.send_room_event(RoomEvent::UserLeft { + self.send_message(RoomEvent::UserLeft { room_id: self.id.clone(), user_id: id.to_owned(), }) @@ -459,7 +469,7 @@ impl Room { pub async fn remove_track(&self, id: String) { self.close_track(&id); - self.send_room_event(RoomEvent::RemoveTrack { removed_tracks: vec![id], room: self.id.clone() }).await; + self.send_message(RoomEvent::RemoveTrack { removed_tracks: vec![id], room: self.id.clone() }).await; } pub async fn publish_track( @@ -495,8 +505,9 @@ impl Room { // TODO: stop the RTP sender thread and drop } - pub async fn send_room_event(&self, event: RoomEvent) { - if let Ok(payload) = serde_json::to_string(&event) { + /// Send a serializable message to all peers' subscribers + pub async fn send_message(&self, msg: Message) where Message: Serialize + Debug { + if let Ok(payload) = serde_json::to_string(&msg) { info!("[Room {}] Sending room event: {}", self.id, payload); for peer in self.peers.iter() { if let Some(subscriber) = peer.subscriber().await { @@ -506,7 +517,7 @@ impl Room { } } } else { - error!("Error parsing {:?}", event); + error!("Error parsing {:?}", msg); }; } } From 815c6b29b8b1671d73993a9f4c14a6d325388b1e Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 5 Jun 2025 22:45:40 -0400 Subject: [PATCH 16/36] more printing --- volcano-sfu/src/turn.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/volcano-sfu/src/turn.rs b/volcano-sfu/src/turn.rs index b988be7..4064696 100644 --- a/volcano-sfu/src/turn.rs +++ b/volcano-sfu/src/turn.rs @@ -56,9 +56,10 @@ impl AuthHandler for CustomAuthHandler { ) -> Result, Error> { debug!("realm val: {}", realm); if let Some(val) = self.users_map.get(&username.to_string()) { + debug!("username accepted: {}", username); return Ok(val.clone()); } - + error!("Invalid username"); Err(Error::ErrNilConn) } } @@ -68,7 +69,7 @@ pub async fn init_turn_server( auth: Option>, ) -> anyhow::Result { let conn = Arc::new(UdpSocket::bind(conf.address.clone()).await?); - info!("UDP listening {}...", conn.local_addr()?); + info!("TURN server listening {}...", conn.local_addr()?); let mut new_auth: Option> = auth; @@ -114,7 +115,7 @@ pub async fn init_turn_server( relay_addr_generator: Box::new(RelayAddressGeneratorRanges { min_port, max_port, - max_retries: 1, + max_retries: 20, relay_address: IpAddr::from_str(addr[0])?, address: "0.0.0.0".to_owned(), net: Arc::new(Net::new(Some(NetConfig::default()))), @@ -129,5 +130,6 @@ pub async fn init_turn_server( alloc_close_notify: None, }) .await?; + info!("TURN server started"); Ok(turn_server) } From 509c291cb1eedd0939a1c45d2fe2bffa62226121 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 5 Jun 2025 22:47:28 -0400 Subject: [PATCH 17/36] experimental: allow negotiate with offer options --- volcano-sfu/src/buffer/buffer.rs | 4 +- volcano-sfu/src/rtc/peer.rs | 30 +++++++++++--- volcano-sfu/src/rtc/peer/subscriber.rs | 31 +++++++++++---- volcano-sfu/src/rtc/room.rs | 52 ++++++++++++++++++++----- volcano-sfu/src/track/audio_observer.rs | 9 ++++- volcano-sfu/src/track/router.rs | 13 +++++-- 6 files changed, 109 insertions(+), 30 deletions(-) diff --git a/volcano-sfu/src/buffer/buffer.rs b/volcano-sfu/src/buffer/buffer.rs index 2cf6ef7..7029ea4 100644 --- a/volcano-sfu/src/buffer/buffer.rs +++ b/volcano-sfu/src/buffer/buffer.rs @@ -37,7 +37,7 @@ pub type OnFeedbackCallBackFn = Box< >; pub type OnAudioLevelFn = - Box Pin + Send + 'static>>) + Send + Sync>; + Box Pin + Send + 'static>>) + Send + Sync>; pub enum BufferPacketType { RTPBufferPacket = 1, RTCPBufferPacket = 2, @@ -510,7 +510,7 @@ impl AtomicBuffer { if let Ok(data) = rv { let mut handler = buffer.on_audio_level.lock().await; if let Some(f) = &mut *handler { - f(data.level).await; + f(data.voice, data.level).await; } } } diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index 3dd580b..1cbdfc0 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -15,7 +15,7 @@ use webrtc::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, ice_connection_state::RTCIceConnectionState }, peer_connection::{ - configuration::RTCConfiguration, sdp::session_description::RTCSessionDescription, signaling_state::RTCSignalingState + configuration::RTCConfiguration, offer_answer_options::RTCOfferOptions, sdp::session_description::RTCSessionDescription, signaling_state::RTCSignalingState }, }; @@ -160,13 +160,19 @@ impl Peer { inner_subscriber.no_auto_subscribe = cfg.no_auto_subscribe; let subscriber = Arc::new(inner_subscriber); let remote_answer_pending_out = self.remote_answer_pending.clone(); + //let remote_answer_pending_out_2 = self.remote_answer_pending.clone(); let negotiation_pending_out = self.negotiation_pending.clone(); + //let negotiation_pending_out_2 = self.negotiation_pending.clone(); let closed_out = self.closed.clone(); + //let closed_out_2 = self.closed.clone(); let sub = Arc::clone(&subscriber); + //let sub_2 = Arc::clone(&subscriber); let on_offer_handler_out = self.on_offer_fn.clone(); + //let on_offer_handler_out_2 = self.on_offer_fn.clone(); let id_clone_out = id.clone(); + //let id_clone_out_2 = id.clone(); subscriber - .register_on_negociate(Box::new(move || { + .register_on_negotiate(Box::new(move |offer_options: Option| { let remote_answer_pending_in = remote_answer_pending_out.clone(); let negotiation_pending_in = negotiation_pending_out.clone(); let closed_in = closed_out.clone(); @@ -174,12 +180,14 @@ impl Peer { let sub_in = sub.clone(); let on_offer_handler_in = on_offer_handler_out.clone(); Box::pin(async move { + debug!("Start negotiation"); if remote_answer_pending_in.load(Ordering::Relaxed) { (*negotiation_pending_in).store(true, Ordering::Relaxed); + debug!("Negotiation set to pending. Reason: Remote answer pending"); return Ok(()); } - let offer = sub_in.create_offer().await?; + let offer = sub_in.create_offer(offer_options).await?; (*remote_answer_pending_in).store(true, Ordering::Relaxed); if let Some(on_offer) = &mut *on_offer_handler_in.lock().await { @@ -193,6 +201,7 @@ impl Peer { }) })) .await; + let on_ice_candidate_out = self.on_ice_candidate_fn.clone(); let closed_out_ = self.closed.clone(); subscriber.register_on_ice_candidate(Box::new(move |candidate| { @@ -223,7 +232,7 @@ impl Peer { if let Some(sub) = &*self.subscriber.lock().await { sub.add_data_channel(&dc.config.label).await?; info!("[Subscriber {}] Trying to offer...", sub.id); - sub.create_offer().await?; + sub.create_offer(None).await?; } } } @@ -314,8 +323,19 @@ impl Peer { if self.negotiation_pending.load(Ordering::Relaxed) { self.negotiation_pending.store(false, Ordering::Relaxed); info!("Subscriber negotiate"); - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; } + + // set renegotation method for subscriber + let sub = Arc::clone(&subscriber); + subscriber.pc.on_negotiation_needed(Box::new(move || { + let sub_in = sub.clone(); + Box::pin(async move { + info!("Start renegotiation"); + if let Err(err) = sub_in.negotiate(Some( RTCOfferOptions { voice_activity_detection: true, ice_restart: true })).await { + error!("renegotiate err: {}", err); + } + })})); } else { return Err(Error::ErrNoTransportEstablished.into()); } diff --git a/volcano-sfu/src/rtc/peer/subscriber.rs b/volcano-sfu/src/rtc/peer/subscriber.rs index e03db73..6720990 100644 --- a/volcano-sfu/src/rtc/peer/subscriber.rs +++ b/volcano-sfu/src/rtc/peer/subscriber.rs @@ -7,6 +7,7 @@ use tokio::time::{sleep, Duration}; use webrtc::data_channel::data_channel_init::RTCDataChannelInit; use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState; use webrtc::ice_transport::ice_gatherer::OnLocalCandidateHdlrFn; +use webrtc::peer_connection::offer_answer_options::RTCOfferOptions; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use webrtc::rtcp::source_description::SourceDescription; use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; @@ -38,11 +39,14 @@ pub struct Subscriber { channels: Arc>>>, candidates: Arc>>, on_negotiate: Arc>>, + on_renegotiate: Arc>>, pub no_auto_subscribe: bool, } pub type OnNegotiateFn = - Box Pin> + Send + 'static>>) + Send + Sync>; + Box) -> Pin> + Send + 'static>>) + Send + Sync>; +pub type OnRenegotiateFn = + Box Pin> + Send + 'static>>) + Send + Sync>; impl Subscriber { pub async fn new(id: String, c: Arc) -> Result { @@ -59,6 +63,7 @@ impl Subscriber { channels: Default::default(), candidates: Default::default(), on_negotiate: Default::default(), + on_renegotiate: Default::default(), no_auto_subscribe: Default::default(), }; subscriber.on_ice_connection_state_change().await; @@ -129,8 +134,8 @@ impl Subscriber { Ok(data_channel) } - pub async fn create_offer(&self) -> Result { - let offer = self.pc.create_offer(None).await?; + pub async fn create_offer(&self, options: Option) -> Result { + let offer = self.pc.create_offer(options).await?; self.pc.set_local_description(offer.clone()).await?; Ok(offer) } @@ -170,11 +175,16 @@ impl Subscriber { self.pc.on_ice_candidate(f) } - pub async fn register_on_negociate(&self, f: OnNegotiateFn) { + pub async fn register_on_negotiate(&self, f: OnNegotiateFn) { let mut handler = self.on_negotiate.lock().await; *handler = Some(f); } + pub async fn register_on_renegotiate(&self, f: OnRenegotiateFn) { + let mut handler = self.on_renegotiate.lock().await; + *handler = Some(f); + } + pub async fn data_channel(&self, label: &String) -> Option> { self.channels.lock().await.get(label).cloned() } @@ -183,11 +193,18 @@ impl Subscriber { self.tracks.lock().await.get(stream_id).cloned() } - pub async fn negotiate(&self) -> Result<()> { - info!("Calling negotitation"); + pub async fn negotiate(&self, offer_options: Option) -> Result<()> { let mut handler = self.on_negotiate.lock().await; if let Some(f) = &mut *handler { - f().await?; + f(offer_options).await?; + } + Ok(()) + } + + pub async fn renegotiate(&self, ice_restart: bool) -> Result<()> { + let mut handler = self.on_renegotiate.lock().await; + if let Some(f) = &mut *handler { + f(ice_restart).await?; } Ok(()) } diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 924e46c..6d89dbd 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -8,19 +8,18 @@ use postage::{ broadcast::{channel, Receiver, Sender}, sink::Sink, }; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, time::{interval, Duration}}; use ulid::Ulid; use webrtc::{ - data::data_channel::DataChannel, - data_channel::{ + data::data_channel::DataChannel, data_channel::{ data_channel_message::DataChannelMessage, data_channel_state::RTCDataChannelState, RTCDataChannel, - }, track::track_local::{TrackLocal, track_local_static_rtp::TrackLocalStaticRTP}, + }, peer_connection::offer_answer_options::RTCOfferOptions, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal} }; use serde::Serialize; -use crate::track::{audio_observer::AudioObserver, router::LocalRouter}; +use crate::{track::{audio_observer::AudioObserver, router::LocalRouter}}; use super::peer::Peer; @@ -40,6 +39,10 @@ pub enum RoomEvent { removed_tracks: Vec, room: String, }, + VoiceActivity { + room_id: String, + stream_ids: Vec, + }, UserJoin { room_id: String, user_id: String, @@ -93,7 +96,7 @@ impl Room { signalers: Default::default(), labels: Default::default(), peers: Default::default(), - audio_observer: Arc::new(Mutex::new(AudioObserver::new(100, 80, 50))), + audio_observer: Arc::new(Mutex::new(AudioObserver::new(65, 600, 50))), user_tracks: Default::default(), tracks: Default::default(), }) @@ -172,7 +175,10 @@ impl Room { } info!("Data channel negotiation"); - if let Err(err) = subscriber.negotiate().await { + if let Err(err) = subscriber.negotiate(Some(RTCOfferOptions { + ice_restart: true, + voice_activity_detection: true, + })).await { error!("negotiate error:{}", err); } else { info!("Data channel negotiation successful"); @@ -202,11 +208,13 @@ impl Room { })); let room_id = self.id.clone(); info!("[Room {room_id}] Data channel negotiation"); - if let Err(err) = subscriber.negotiate().await { + if let Err(err) = subscriber.negotiate(None).await { error!("[Room {room_id}] negotiate error: {}", err); } else { - info!("[Room {room_id}] Data channel negotiation successful"); + info!("[Room {room_id}] Negotiation successful"); } + } else { + error!("[Room {}] Unknown peer {id}", self.id); } } @@ -367,6 +375,24 @@ impl Room { return RoomInfo { id: self.id.clone(), users }; } + async fn start_audio_observer_task(self: &Arc) { + let observer = self.audio_observer.clone(); + let mut interval = interval(Duration::from_millis(500)); + let room_out = self.clone(); + tokio::spawn(async move { + loop { + interval.tick().await; + let mut observer_in = observer.lock().await; + let streams = observer_in.calc().await; + if let Some(streams) = streams { + info!("Streams {:?}", streams); + room_out.send_message(RoomEvent::VoiceActivity { room_id: room_out.id.clone(), stream_ids: streams }).await; + } + } + }); + info!("Audio observer task started"); + } + pub async fn subscribe(self: &Arc, peer: Arc) { info!("Subscribing a new peer"); @@ -414,10 +440,16 @@ impl Room { } info!("Subscribe Negotiate"); - if let Err(err) = peer.subscriber().await.unwrap().negotiate().await { + if let Err(err) = peer.subscriber().await.unwrap().negotiate(None).await { error!("negotiate error: {}", err); } } + + // Offer API data channel to client subscriber + self.add_api_channel(&peer.id()).await; + + // Start audio observer task + self.start_audio_observer_task().await; } /// Remove a user from the room pub async fn remove_user(&self, id: &str) { diff --git a/volcano-sfu/src/track/audio_observer.rs b/volcano-sfu/src/track/audio_observer.rs index 50a6ba5..50c66ed 100644 --- a/volcano-sfu/src/track/audio_observer.rs +++ b/volcano-sfu/src/track/audio_observer.rs @@ -19,7 +19,7 @@ impl AudioObserver { /// Creates an audio observer with threshold lower than 128, an interval, and a filter from 0 to 100. /// ## Example /// ``` - /// let observer = AudioObserver::new(100, 80, 50); + /// let observer = AudioObserver::new(70, 80, 50); /// ``` pub fn new(threshold_parameter: u8, interval_parameter: i32, filter_parameter: i32) -> Self { let mut threshold: u8 = threshold_parameter; @@ -48,11 +48,12 @@ impl AudioObserver { } pub async fn remove_stream(&mut self, stream_id: &str) { + debug!("Remove stream {}", stream_id); let mut streams = self.streams.lock().await; streams.retain(|stream| !stream.id.eq(stream_id)); } - /// Observes whether d_bov is higher than threshold for target stream. + /// Observes whether d_bov is higher than threshold for target stream, then it should be ignored. /// If d_bov is lower or equal than treshold, it sums d_bov into target stream. pub async fn observe(&self, stream_id: &str, d_bov: u8) { let mut streams = self.streams.lock().await; @@ -61,6 +62,7 @@ impl AudioObserver { .find(|stream| stream.id.eq(stream_id)); if let Some(stream) = target { + // Active voice level should be lower than threshold if d_bov <= self.threshold { stream.sum += d_bov as i32; stream.total += 1; @@ -85,6 +87,7 @@ impl AudioObserver { for stream in streams.iter_mut() { if stream.total >= self.expected { + debug!("[stream {}] {}/{} (acceptable)", stream.id, stream.total, self.expected); stream_ids.push(stream.id.clone()); } @@ -94,12 +97,14 @@ impl AudioObserver { if self.previous.len() == stream_ids.len() { for idx in 0..self.previous.len() { + // If any stream id is different, reset the previous vector. if self.previous[idx] != stream_ids[idx] { self.previous = stream_ids.clone(); return Some(stream_ids); } } + // If all stream ids are the same, do not reset the previous vector. return None; } self.previous = stream_ids.clone(); diff --git a/volcano-sfu/src/track/router.rs b/volcano-sfu/src/track/router.rs index af193ee..3c2a50c 100644 --- a/volcano-sfu/src/track/router.rs +++ b/volcano-sfu/src/track/router.rs @@ -172,7 +172,7 @@ impl LocalRouter { s_in.remove_down_track(&r_in.stream_id(), &down_track_in) .await; info!("RemoveDownTrack Negotiate"); - if let Err(err) = s_in.negotiate().await { + if let Err(err) = s_in.negotiate(None).await { error!("negotiate err:{} ", err); } } @@ -214,7 +214,7 @@ impl LocalRouter { if let Some(receiver) = r { info!("Add actual downtrack to subscriber, subscriber: {}", subscriber.id); self.add_down_track(subscriber.clone(), receiver).await?; - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; return Ok(()); } @@ -231,7 +231,7 @@ impl LocalRouter { for val in recs { self.add_down_track(subscriber.clone(), val.clone()).await?; } - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; } Ok(()) @@ -255,13 +255,18 @@ impl LocalRouter { RTPCodecType::Audio => { let room_out = self.room.clone(); let stream_id_out = stream_id.clone(); - buffer.register_on_audio_level(Box::new(move |level| { + buffer.register_on_audio_level(Box::new(move |voice, level| { let room_in = room_out.clone(); let stream_id_in = stream_id_out.clone(); Box::pin(async move { + if !voice { + debug!("Skip observation"); + return; + } room_in.audio_observer.lock().await.observe(&stream_id_in, level).await; }) })).await; + debug!("[Room {}] add stream {} to audio observer", self.room.id, stream_id); self.room.audio_observer.lock().await.add_stream(stream_id).await; }, RTPCodecType::Video => { From 28dc2f36010c6864ffd22b37c234b3ccaa613385 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 5 Jun 2025 22:51:42 -0400 Subject: [PATCH 18/36] experimental: add api channel on room subscribe --- volcano-sfu/src/rtc/peer/publisher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volcano-sfu/src/rtc/peer/publisher.rs b/volcano-sfu/src/rtc/peer/publisher.rs index 82ec9ef..df35be5 100644 --- a/volcano-sfu/src/rtc/peer/publisher.rs +++ b/volcano-sfu/src/rtc/peer/publisher.rs @@ -205,7 +205,7 @@ impl Publisher { if channel.label() == super::subscriber::API_CHANNEL_LABEL { info!("[Publisher {id_in}] API data channel published from client!"); return Box::pin(async move { - room_in.add_api_channel(&id_in).await; + //room_in.add_api_channel(&id_in).await; }); } Box::pin(async move { From fb270f6141b16bfd61a9f3a1d40d6da3e6e834f3 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 5 Jun 2025 23:09:25 -0400 Subject: [PATCH 19/36] chore: bump to 0.3.7 --- Cargo.lock | 105 +++++++++++++++-------------- server/Cargo.toml | 2 +- volcano-sfu/Cargo.toml | 4 +- volcano-sfu/src/track/downtrack.rs | 4 +- 4 files changed, 60 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd343a6..6a1a749 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,9 +236,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.94" +version = "1.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "d0fc897dc1e865cc67c0e05a836d9d3f1df3cbe442aa4a9473b18e12624a4951" +dependencies = [ + "shlex", +] [[package]] name = "ccm" @@ -771,9 +774,9 @@ dependencies = [ [[package]] name = "interceptor" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ab04c530fd82e414e40394cabe5f0ebfe30d119f10fe29d6e3561926af412e" +checksum = "1ac0781c825d602095113772e389ef0607afcb869ae0e68a590d8e0799cdcef8" dependencies = [ "async-trait", "bytes", @@ -829,9 +832,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "lock_api" @@ -1301,24 +1304,23 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.8" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", "getrandom", "libc", - "spin", "untrusted", "windows-sys 0.52.0", ] [[package]] name = "rtcp" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8306430fb118b7834bbee50e744dc34826eca1da2158657a3d6cbc70e24c2096" +checksum = "e9689528bf3a9eb311fd938d05516dd546412f9ce4fffc8acfc1db27cc3dbf72" dependencies = [ "bytes", "thiserror", @@ -1327,9 +1329,9 @@ dependencies = [ [[package]] name = "rtp" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68baca5b6cb4980678713f0d06ef3a432aa642baefcbfd0f4dd2ef9eb5ab550" +checksum = "c54733451a67d76caf9caa07a7a2cec6871ea9dda92a7847f98063d459200f4b" dependencies = [ "bytes", "memchr", @@ -1366,9 +1368,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.15" +version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ "once_cell", "ring", @@ -1380,15 +1382,18 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +dependencies = [ + "zeroize", +] [[package]] name = "rustls-webpki" -version = "0.102.8" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -1409,9 +1414,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sdp" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" +checksum = "4cd277015eada44a0bb810a4b84d3bf6e810573fa62fb442f457edf6a1087a69" dependencies = [ "rand", "substring", @@ -1520,6 +1525,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1573,12 +1584,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "spki" version = "0.7.3" @@ -1597,9 +1602,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "stun" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3" +checksum = "7dbc2bab375524093c143dc362a03fb6a1fb79e938391cdb21665688f88a088a" dependencies = [ "base64", "crc", @@ -1836,9 +1841,9 @@ dependencies = [ [[package]] name = "turn" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb" +checksum = "3f5aea1116456e1da71c45586b87c72e3b43164fbf435eb93ff6aa475416a9a4" dependencies = [ "async-trait", "base64", @@ -1943,7 +1948,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.6" +version = "0.3.7" dependencies = [ "anyhow", "async-trait", @@ -2045,9 +2050,9 @@ dependencies = [ [[package]] name = "webrtc" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30367074d9f18231d28a74fab0120856b2b665da108d71a12beab7185a36f97b" +checksum = "24bab7195998d605c862772f90a452ba655b90a2f463c850ac032038890e367a" dependencies = [ "arc-swap", "async-trait", @@ -2089,9 +2094,9 @@ dependencies = [ [[package]] name = "webrtc-data" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec93b991efcd01b73c5b3503fa8adba159d069abe5785c988ebe14fcf8f05d1" +checksum = "4e97b932854da633a767eff0cc805425a2222fc6481e96f463e57b015d949d1d" dependencies = [ "bytes", "log", @@ -2104,9 +2109,9 @@ dependencies = [ [[package]] name = "webrtc-dtls" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c9b89fc909f9da0499283b1112cd98f72fec28e55a54a9e352525ca65cd95c" +checksum = "5ccbe4d9049390ab52695c3646c1395c877e16c15fb05d3bda8eee0c7351711c" dependencies = [ "aes", "aes-gcm", @@ -2141,9 +2146,9 @@ dependencies = [ [[package]] name = "webrtc-ice" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0348b28b593f7709ac98d872beb58c0009523df652c78e01b950ab9c537ff17d" +checksum = "eb51bde0d790f109a15bfe4d04f1b56fb51d567da231643cb3f21bb74d678997" dependencies = [ "arc-swap", "async-trait", @@ -2166,9 +2171,9 @@ dependencies = [ [[package]] name = "webrtc-mdns" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6dfe9686c6c9c51428da4de415cb6ca2dc0591ce2b63212e23fd9cccf0e316b" +checksum = "979cc85259c53b7b620803509d10d35e2546fa505d228850cbe3f08765ea6ea8" dependencies = [ "log", "socket2", @@ -2179,9 +2184,9 @@ dependencies = [ [[package]] name = "webrtc-media" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704" +checksum = "80041211deccda758a3e19aa93d6b10bc1d37c9183b519054b40a83691d13810" dependencies = [ "byteorder", "bytes", @@ -2192,9 +2197,9 @@ dependencies = [ [[package]] name = "webrtc-sctp" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5faf3846ec4b7e64b56338d62cbafe084aa79806b0379dff5cc74a8b7a2b3063" +checksum = "07439c134425d51d2f10907aaf2f815fdfb587dce19fe94a4ae8b5faf2aae5ae" dependencies = [ "arc-swap", "async-trait", @@ -2210,9 +2215,9 @@ dependencies = [ [[package]] name = "webrtc-srtp" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771db9993712a8fb3886d5be4613ebf27250ef422bd4071988bf55f1ed1a64fa" +checksum = "01e773f79b09b057ffbda6b03fe7b43403b012a240cf8d05d630674c3723b5bb" dependencies = [ "aead", "aes", @@ -2233,9 +2238,9 @@ dependencies = [ [[package]] name = "webrtc-util" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1438a8fd0d69c5775afb4a71470af92242dbd04059c61895163aa3c1ef933375" +checksum = "64bfb10dbe6d762f80169ae07cf252bafa1f764b9594d140008a0231c0cdce58" dependencies = [ "async-trait", "bitflags", diff --git a/server/Cargo.toml b/server/Cargo.toml index d94d9de..6aa73de 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,7 +24,7 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" # WebRTC -webrtc = "0.12.0" +webrtc = "0.13.0" anyhow = "1.0.82" postage = "0.5.0" diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index 7beef38..790589f 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.6" +version = "0.3.7" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -29,6 +29,6 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" # WebRTC -webrtc = "0.12.0" +webrtc = "0.13.0" toml = "0.8.19" diff --git a/volcano-sfu/src/track/downtrack.rs b/volcano-sfu/src/track/downtrack.rs index 1be124b..170c214 100644 --- a/volcano-sfu/src/track/downtrack.rs +++ b/volcano-sfu/src/track/downtrack.rs @@ -73,7 +73,7 @@ pub struct DownTrackInternal { pub last_ssrc: AtomicU32, pub codec: RTCRtpCodecCapability, pub receiver: Arc, - pub write_stream: Mutex>>, //TrackLocalWriter, + pub write_stream: Mutex>>, //Option, on_bind_handler: Arc>>, #[allow(dead_code)] @@ -253,7 +253,7 @@ impl TrackLocal for DownTrackInternal { let mut payload_type = self.payload_type.lock().await; *payload_type = codec.payload_type; let mut write_stream = self.write_stream.lock().await; - *write_stream = t.write_stream(); + *write_stream = Some(t.write_stream()); let mut mime = self.mime.lock().await; *mime = codec.capability.mime_type.to_lowercase(); self.re_sync.store(true, Ordering::Relaxed); From 1d7cafcead9b782e088392f7437dfff41602c5be Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 8 Jun 2025 15:25:29 -0400 Subject: [PATCH 20/36] remove turn implementation (will be replaced) --- Cargo.lock | 2 +- Dockerfile | 16 ++++++---------- server/Cargo.toml | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a1a749..c3b8c0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,7 +1486,7 @@ dependencies = [ [[package]] name = "server" -version = "0.1.1" +version = "0.1.2" dependencies = [ "anyhow", "async-trait", diff --git a/Dockerfile b/Dockerfile index db53377..6eaee4f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,29 +7,25 @@ RUN mkdir volcano WORKDIR /home/rust/volcano COPY Cargo.toml Cargo.lock ./ -# Create lib -RUN USER=root cargo new --lib volcano-sfu COPY volcano-sfu ./volcano-sfu -# Create server -RUN USER=root cargo new --bin server COPY server ./server # Build -RUN cargo build --locked --release -RUN cargo install --path server -RUN rm */src/*.rs target/release/deps/volcano* +#RUN cargo build --locked --release +RUN cargo install --locked --path server --root /usr/local +RUN rm -r */src/*.rs target # Bundle FROM gcr.io/distroless/cc-debian12 -COPY --from=build /usr/local/cargo/bin/server ./volcano-server +COPY --from=build /usr/local/bin/server ./volcano-server COPY config.toml ./config.toml # Signaling server port EXPOSE 4000/tcp -#TURN server port -EXPOSE 3478/udp +# TURN server port +#EXPOSE 3478/udp ENV RUST_LOG=debug diff --git a/server/Cargo.toml b/server/Cargo.toml index 6aa73de..cb3e458 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.1.1" +version = "0.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From ff6543838ef87e18e696cd5ba9514fd63227463e Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 8 Jun 2025 15:26:56 -0400 Subject: [PATCH 21/36] feat: read configuration for room audio observer --- config.toml | 6 ++--- server/src/signaling/client.rs | 7 +++--- server/src/signaling/server.rs | 7 +++--- volcano-sfu/src/rtc/config.rs | 6 ++--- volcano-sfu/src/rtc/peer.rs | 10 +++++---- volcano-sfu/src/rtc/room.rs | 29 ++++++++++++++++--------- volcano-sfu/src/track/audio_observer.rs | 11 ++++++++-- 7 files changed, 46 insertions(+), 30 deletions(-) diff --git a/config.toml b/config.toml index 6cd98bc..5900da5 100644 --- a/config.toml +++ b/config.toml @@ -8,16 +8,16 @@ maxpackettrack = 500 # Values from [0-127] where 0 is the loudest. # Audio levels are read from rtp extension header according to: # https://tools.ietf.org/html/rfc6464 -audiolevelthreshold = 40 +audiolevelthreshold = 65 # Sets the interval in which the SFU will check the audio level # in [ms]. If the active speaker has changed, the sfu will # emit an event to clients. -audiolevelinterval=1000 +audiolevelinterval=500 # Sets minimum percentage of events required to fire an audio level # according to the expected events from the audiolevelinterval, # calculated as audiolevelinterval/packetization time (20ms for 8kHz) # Values from [0-100] -audiolevelfilter = 20 +audiolevelfilter = 50 withstats = false [router.simulcast] diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index 788d047..eb61dd6 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -163,9 +163,10 @@ impl Client { offer, cfg, } => { - let room = Room::get(&room_id); + let router_config = &peer.config().router; + let room = Room::get_or_create(&room_id, router_config); self.room = Some(room.clone()); - self.handle_join(write, room.clone(), offer, cfg, id).await + self.handle_join(write, room, offer, cfg, id).await } PacketC2S::Leave => { match &self.room { @@ -255,7 +256,7 @@ impl Client { })) .await; - if let Err(err) = peer.join(room.id.clone(), cfg).await { + if let Err(err) = peer.join(room.clone(), cfg).await { error!("join error: {}", err); return Err(err); } diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index cc1748a..4ddb021 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -7,7 +7,6 @@ use volcano_sfu::{ config::{self, WebRTCTransportConfig}, room::Room, }, - turn, }; use super::{ @@ -51,9 +50,9 @@ pub async fn launch(addr: A, auth: AuthFn) -> Result<()> { .inspect_err(|e| error!("Error loading config: {e}. Loading default config.")) .unwrap_or_default(); let config = c.clone(); - if c.turn.enabled { - turn::init_turn_server(c.turn, c.turn_auth).await?; - } + //if c.turn.enabled { + // turn::init_turn_server(c.turn, c.turn_auth).await?; + //} let webrtc_config = Arc::new(WebRTCTransportConfig::new(&config).await?); // Accept new connections let auth = Arc::new(auth); diff --git a/volcano-sfu/src/rtc/config.rs b/volcano-sfu/src/rtc/config.rs index 7e890ea..4e43820 100644 --- a/volcano-sfu/src/rtc/config.rs +++ b/volcano-sfu/src/rtc/config.rs @@ -67,11 +67,9 @@ pub struct RouterConfig { #[serde(rename = "audiolevelinterval")] pub audio_level_interval: i32, #[serde(rename = "audiolevelthreshold")] - #[allow(dead_code)] - audio_level_threshold: u8, + pub audio_level_threshold: u8, #[serde(rename = "audiolevelfilter")] - #[allow(dead_code)] - audio_level_filter: i32, + pub audio_level_filter: i32, pub simulcast: SimulcastConfig, } diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index 1cbdfc0..bd72cda 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -132,16 +132,18 @@ impl Peer { } } + pub fn config(&self) -> Arc { + self.config.clone() + } pub fn id(&self) -> String { self.id.clone() } - pub async fn join(self: &Arc, room_id: String, cfg: JoinConfig) -> Result<()> { + pub async fn join(self: &Arc, room: Arc, cfg: JoinConfig) -> Result<()> { let id = &self.id; - info!("[{id}] Join to {room_id} requested"); + info!("[{id}] Join to {} requested", room.id); - let room = Room::get(&room_id); *self.room.lock().await = Some(room.clone()); let rtc_config_clone = RTCConfiguration { ice_servers: self.config.configuration.ice_servers.clone(), @@ -284,7 +286,7 @@ impl Peer { room.add_peer(self.clone()).await; info!( "[Peer {}] Adds to room {}", - id, room_id + id, room.id ); // Send user join event with no tracks diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 6d89dbd..ed68896 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -19,7 +19,7 @@ use webrtc::{ use serde::Serialize; -use crate::{track::{audio_observer::AudioObserver, router::LocalRouter}}; +use crate::{rtc::config::RouterConfig, track::{audio_observer::AudioObserver, router::LocalRouter}}; use super::peer::Peer; @@ -85,8 +85,12 @@ lazy_static! { impl Room { /// Create a new Room and initialise internal channels and maps - fn new(id: String) -> Arc { + fn new(id: String, router_config: &RouterConfig) -> Arc { let (sender, _dropped) = channel(10); + let audio_threshold = router_config.audio_level_threshold; + let audio_interval = router_config.audio_level_interval; + let audio_filter = router_config.audio_level_filter; + let audio_observer = AudioObserver::new(audio_threshold, audio_interval, audio_filter); Arc::new(Room { closed: Default::default(), @@ -96,7 +100,7 @@ impl Room { signalers: Default::default(), labels: Default::default(), peers: Default::default(), - audio_observer: Arc::new(Mutex::new(AudioObserver::new(65, 600, 50))), + audio_observer: Arc::new(Mutex::new(audio_observer)), user_tracks: Default::default(), tracks: Default::default(), }) @@ -234,13 +238,16 @@ impl Room { } /// Get or create a Room by its ID - pub fn get(id: &str) -> Arc { - if let Some(room) = ROOMS.get(id) { - room.clone() + pub fn get(id: &str) -> Option> { + ROOMS.get(id).map(|room| room.clone()) + } + + pub fn get_or_create(id: &str, router_config: &RouterConfig) -> Arc { + if let Some(room) = Self::get(id) { + room } else { - let room: Arc = Room::new(id.to_owned()); + let room: Arc = Room::new(id.to_owned(), router_config); ROOMS.insert(id.to_string(), room.clone()); - room } } @@ -377,12 +384,14 @@ impl Room { async fn start_audio_observer_task(self: &Arc) { let observer = self.audio_observer.clone(); - let mut interval = interval(Duration::from_millis(500)); + let interval_ms = observer.lock().await.interval as u64; + let mut interval = interval(Duration::from_millis(interval_ms)); let room_out = self.clone(); tokio::spawn(async move { loop { interval.tick().await; - let mut observer_in = observer.lock().await; + let observer_in = &mut observer.lock().await; + let streams = observer_in.calc().await; if let Some(streams) = streams { info!("Streams {:?}", streams); diff --git a/volcano-sfu/src/track/audio_observer.rs b/volcano-sfu/src/track/audio_observer.rs index 50c66ed..d579b00 100644 --- a/volcano-sfu/src/track/audio_observer.rs +++ b/volcano-sfu/src/track/audio_observer.rs @@ -10,8 +10,9 @@ pub struct AudioStream { #[derive(Default, Clone, Debug)] pub struct AudioObserver { streams: Arc>>, - expected: i32, - threshold: u8, + pub expected: i32, + pub threshold: u8, + pub interval: i32, previous: Vec, } @@ -35,6 +36,7 @@ impl AudioObserver { } Self { threshold, + interval: interval_parameter, expected: interval_parameter * filter / 2000, ..Default::default() } @@ -111,4 +113,9 @@ impl AudioObserver { Some(stream_ids) } + + /// Returns true if there are no streams. + pub async fn is_empty(&self) -> bool { + self.streams.lock().await.is_empty() + } } From b8ed9118c306346e22ad7de9d20c793abb0807b8 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Sun, 8 Jun 2025 18:17:38 -0400 Subject: [PATCH 22/36] fix: use more references, less clonation --- server/src/signaling/client.rs | 6 +++--- volcano-sfu/src/rtc/peer.rs | 4 ++-- volcano-sfu/src/rtc/peer/api.rs | 2 +- volcano-sfu/src/rtc/peer/subscriber.rs | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index eb61dd6..0f7babe 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -166,7 +166,7 @@ impl Client { let router_config = &peer.config().router; let room = Room::get_or_create(&room_id, router_config); self.room = Some(room.clone()); - self.handle_join(write, room, offer, cfg, id).await + self.handle_join(write, room, offer, &cfg, id).await } PacketC2S::Leave => { match &self.room { @@ -197,12 +197,12 @@ impl Client { write: &Sender, room: Arc, initial_offer: RTCSessionDescription, - cfg: JoinConfig, + cfg: &JoinConfig, id: u32, ) -> Result<()> { // Signaling was experimental. // room.subscribe_signal(self.signal.clone()).await; - let peer = self.peer.clone(); + let peer = &self.peer; let write_out_1 = write.clone(); let write_out_2 = write.clone(); let write_out_3 = write.clone(); diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index bd72cda..7bc6ad3 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -140,7 +140,7 @@ impl Peer { self.id.clone() } - pub async fn join(self: &Arc, room: Arc, cfg: JoinConfig) -> Result<()> { + pub async fn join(self: &Arc, room: Arc, cfg: &JoinConfig) -> Result<()> { let id = &self.id; info!("[{id}] Join to {} requested", room.id); @@ -158,7 +158,7 @@ impl Peer { if !cfg.no_subscribe { let mut inner_subscriber = - Subscriber::new(self.user_id.clone(), self.config.clone()).await?; + Subscriber::new(self.user_id.clone(), &self.config).await?; inner_subscriber.no_auto_subscribe = cfg.no_auto_subscribe; let subscriber = Arc::new(inner_subscriber); let remote_answer_pending_out = self.remote_answer_pending.clone(); diff --git a/volcano-sfu/src/rtc/peer/api.rs b/volcano-sfu/src/rtc/peer/api.rs index 6bd223d..635d496 100644 --- a/volcano-sfu/src/rtc/peer/api.rs +++ b/volcano-sfu/src/rtc/peer/api.rs @@ -19,7 +19,7 @@ const MIME_TYPE_VP9: &str = "video/vp9"; const FRAME_MARKING: &str = "urn:ietf:params:rtp-hdrext:framemarking"; /// Initialise a new RTCPeerConnection -pub async fn create_subscriber_connection(cfg: Arc) -> Result> { +pub async fn create_subscriber_connection(cfg: &Arc) -> Result> { // Create a MediaEngine object to configure the supported codec let mut m = MediaEngine::default(); //m.register_default_codecs()?; diff --git a/volcano-sfu/src/rtc/peer/subscriber.rs b/volcano-sfu/src/rtc/peer/subscriber.rs index 6720990..92e2cc9 100644 --- a/volcano-sfu/src/rtc/peer/subscriber.rs +++ b/volcano-sfu/src/rtc/peer/subscriber.rs @@ -49,7 +49,7 @@ pub type OnRenegotiateFn = Box Pin> + Send + 'static>>) + Send + Sync>; impl Subscriber { - pub async fn new(id: String, c: Arc) -> Result { + pub async fn new(id: String, c: &Arc) -> Result { let pc = api::create_subscriber_connection(c).await?; let api_channel = pc.create_data_channel(API_CHANNEL_LABEL, Some(RTCDataChannelInit::default())).await?; info!("[Subscriber {id}] Created data channel `{API_CHANNEL_LABEL}` (awaiting for offer)"); @@ -78,7 +78,7 @@ impl Subscriber { info!("[{}] Created data channel `{}` (awaiting for offer)", self.id, ndc.label()); let tracks_out = self.tracks.clone(); - let ndc_1 = ndc.clone(); + let ndc_1: Arc = ndc.clone(); let ndc_2 = ndc.clone(); ndc.on_open(Box::new(move || { Box::pin(async move { From 72fdd5252ce1e211e0250079a015fa6a9a370cc9 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 9 Jun 2025 01:15:19 -0400 Subject: [PATCH 23/36] feat: add ice servers to client --- .gitignore | 3 ++- server/src/signaling/packets.rs | 3 ++- server/src/signaling/server.rs | 1 + volcano-sfu/src/rtc/config.rs | 38 ++++++++++++++++----------------- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index ccb5166..34dfdb1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.vscode \ No newline at end of file +.vscode +config.toml \ No newline at end of file diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 4564aca..332bfde 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -3,7 +3,7 @@ use tokio_tungstenite::tungstenite::Message; use volcano_sfu::rtc::{peer::JoinConfig, room::RoomInfo}; use webrtc::{ - ice_transport::ice_candidate::RTCIceCandidateInit, + ice_transport::{ice_candidate::RTCIceCandidateInit, ice_server::RTCIceServer}, peer_connection::sdp::session_description::RTCSessionDescription, }; @@ -102,6 +102,7 @@ pub enum PacketS2C { /// Accept authentication Accept { id: u32, + ice_servers: Vec, available_rooms: Vec, }, /// Answer (for client publisher) diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 4ddb021..83bedd2 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -118,6 +118,7 @@ async fn handle_connection( write .send(PacketS2C::Accept { id, + ice_servers: w.configuration.ice_servers.clone(), available_rooms, }) .await?; diff --git a/volcano-sfu/src/rtc/config.rs b/volcano-sfu/src/rtc/config.rs index 4e43820..de410b5 100644 --- a/volcano-sfu/src/rtc/config.rs +++ b/volcano-sfu/src/rtc/config.rs @@ -10,7 +10,9 @@ use crate::{buffer::factory::AtomicFactory, track::error::ConfigError}; #[derive(Clone, Deserialize)] struct ICEServerConfig { urls: Vec, - user_name: String, + #[serde(default)] + username: String, + #[serde(default)] credential: String, } #[derive(Clone, Default, Deserialize)] @@ -41,6 +43,7 @@ pub struct WebRTCConfig { ice_single_port: Option, #[serde(rename = "portrange")] pub ice_port_range: Option>, + #[serde(rename = "iceservers")] ice_servers: Option>, candidates: Candidates, #[serde(rename = "sdpsemantics")] @@ -127,24 +130,21 @@ impl WebRTCTransportConfig { } - let mut ice_servers: Vec = vec![RTCIceServer { - urls: vec!["stun:stun.l.google.com:19302".to_owned(), "stun:stun1.l.google.com:19302".to_owned(), "stun:stun.12connect.com:3478".to_owned()], - ..Default::default() - }]; - - if let Some(ice_lite) = c.webrtc.candidates.ice_lite { - if ice_lite { - se.set_lite(ice_lite); - } else if let Some(ice_servers_cfg) = &c.webrtc.ice_servers { - for ice_server in ice_servers_cfg { - let s = RTCIceServer { - urls: ice_server.urls.clone(), - username: ice_server.user_name.clone(), - credential: ice_server.credential.clone(), - }; - - ice_servers.push(s); - } + let mut ice_servers: Vec = Vec::default(); + let ice_lite = c.webrtc.candidates.ice_lite.unwrap_or_default(); + se.set_lite(ice_lite); + + if !ice_lite { + if let Some(ice_servers_cfg) = &c.webrtc.ice_servers { + for ice_server in ice_servers_cfg { + let s = RTCIceServer { + urls: ice_server.urls.clone(), + username: ice_server.username.clone(), + credential: ice_server.credential.clone(), + }; + + ice_servers.push(s); + } } } From 682c5c972f30d9cbcaa92ba85025ffbb9a097ae9 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 9 Jun 2025 01:29:38 -0400 Subject: [PATCH 24/36] feat: add config example --- README.md | 23 ++++++++--- config.example.toml | 98 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 6 deletions(-) create mode 100644 config.example.toml diff --git a/README.md b/README.md index a175d68..807af43 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,28 @@ Rust implementation of a WebRTC Selective Forwarding Unit A selective forwarding unit is a video routing service which allows webrtc sessions to scale more efficiently. -## Build and run +## Development -### Build +### Setup + +Clone the repository. ```sh -cargo build +git clone https://github.com/MilcaoStudio/volcano.git +cd volcano ``` -### Run server +Copy `config.example.toml` to `config.toml`. +```sh +cp config.example.toml config.toml +``` + +### Build and run + +It's recommended to enable logging for `server` and `volcano_sfu`. +Set `RUST_LOG="debug"` to enable debug level for all crates (including `webrtc`). ```sh -RUST_LOG=debug -./target/debug/server +RUST_LOG="server, volcano_sfu" +cargo run ``` ### License diff --git a/config.example.toml b/config.example.toml new file mode 100644 index 0000000..8f45904 --- /dev/null +++ b/config.example.toml @@ -0,0 +1,98 @@ +[router] +# Limit the remb bandwidth in kbps +# zero means no limits +maxbandwidth = 1500 +# max number of video tracks packets the SFU will keep track +maxpackettrack = 500 +# Sets the audio level volume threshold. +# Values from [0-127] where 0 is the loudest. +# Audio levels are read from rtp extension header according to: +# https://tools.ietf.org/html/rfc6464 +audiolevelthreshold = 65 +# Sets the interval in which the SFU will check the audio level +# in [ms]. If the active speaker has changed, the sfu will +# emit an event to clients. +audiolevelinterval=500 +# Sets minimum percentage of events required to fire an audio level +# according to the expected events from the audiolevelinterval, +# calculated as audiolevelinterval/packetization time (20ms for 8kHz) +# Values from [0-100] +audiolevelfilter = 50 +withstats = false + +[router.simulcast] +# Prefer best quality initially +bestqualityfirst = true +# EXPERIMENTAL enable temporal layer change is currently an experimental feature, +# enable only for testing. +enabletemporallayer = false + +[webrtc] +# Single port, portrange will not work if you enable this +# singleport = 5000 + +# sdp semantics: +# "unified-plan" +# "plan-b" +# "unified-plan-with-fallback" +sdpsemantics = "unified-plan" +# toggle multicast dns support: https://tools.ietf.org/html/draft-mdns-ice-candidates-00 +mdns = false + +# Range of ports that ion accepts WebRTC traffic on +# Format: [min, max] and max - min >= 100 +portrange = [5000, 5200] +# if sfu behind nat, set iceserver +[[webrtc.iceservers]] +urls = ["stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"] +[[webrtc.iceservers]] +# urls = ["turn:turn.awsome.org:3478"] +# username = "awsome" +# credential = "awsome" + +[webrtc.candidates] +# In case you're deploying ion-sfu on a server which is configured with +# a 1:1 NAT (e.g., Amazon EC2), you might want to also specify the public +# address of the machine using the setting below. This will result in +# all host candidates (which normally have a private IP address) to +# be rewritten with the public address provided in the settings. As +# such, use the option with caution and only if you know what you're doing. +# Multiple public IP addresses can be specified as a comma separated list +# if the sfu is deployed in a DMZ between two 1-1 NAT for internal and +# external users. +# nat1to1 = ["1.2.3.4"] +# icelite = true + +[webrtc.timeouts] +# The duration in [sec] without network activity before a ICE Agent is considered disconnected +disconnected = 5 +# The duration in [sec] without network activity before a ICE Agent is considered failed after disconnected +failed = 25 +# How often in [sec] the ICE Agent sends extra traffic if there is no activity, if media is flowing no traffic will be sent +keepalive = 2 + +[turn] +# Enables embeded turn server +enabled = true +# Sets the realm for turn server +realm = "uprising" +# The address the TURN server will listen on. +address = "0.0.0.0:3478" +# Certs path to config tls/dtls +# cert="path/to/cert.pem" +# key="path/to/key.pem" +# Port range that turn relays to SFU +# WARNING: It shouldn't overlap webrtc.portrange +# Format: [min, max] +# portrange = [5201, 5400] + +[turn.auth] +# Use an auth secret to generate long-term credentials defined in RFC5389-10.2 +# NOTE: This takes precedence over `credentials` if defined. +# secret = "secret" +# Sets the credentials pairs +credentials = "uprising=uprising,ion=ion123" + +[log] +# 0 - INFO 1 - DEBUG 2 - TRACE +v = 1 \ No newline at end of file From 634683e364803b8c521e8eb9f83516a2917106c9 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Mon, 9 Jun 2025 01:35:23 -0400 Subject: [PATCH 25/36] chore: remove config.toml because it may contain sensitive credentials --- config.toml | 98 ----------------------------------------------------- 1 file changed, 98 deletions(-) delete mode 100644 config.toml diff --git a/config.toml b/config.toml deleted file mode 100644 index 5900da5..0000000 --- a/config.toml +++ /dev/null @@ -1,98 +0,0 @@ -[router] -# Limit the remb bandwidth in kbps -# zero means no limits -maxbandwidth = 1500 -# max number of video tracks packets the SFU will keep track -maxpackettrack = 500 -# Sets the audio level volume threshold. -# Values from [0-127] where 0 is the loudest. -# Audio levels are read from rtp extension header according to: -# https://tools.ietf.org/html/rfc6464 -audiolevelthreshold = 65 -# Sets the interval in which the SFU will check the audio level -# in [ms]. If the active speaker has changed, the sfu will -# emit an event to clients. -audiolevelinterval=500 -# Sets minimum percentage of events required to fire an audio level -# according to the expected events from the audiolevelinterval, -# calculated as audiolevelinterval/packetization time (20ms for 8kHz) -# Values from [0-100] -audiolevelfilter = 50 -withstats = false - -[router.simulcast] -# Prefer best quality initially -bestqualityfirst = true -# EXPERIMENTAL enable temporal layer change is currently an experimental feature, -# enable only for testing. -enabletemporallayer = false - -[webrtc] -# Single port, portrange will not work if you enable this -# singleport = 5000 - -# Range of ports that ion accepts WebRTC traffic on -# Format: [min, max] and max - min >= 100 -portrange = [5000, 5200] -# if sfu behind nat, set iceserver -# [[webrtc.iceserver]] -# urls = ["stun:stun.stunprotocol.org:3478"] -# [[webrtc.iceserver]] -# urls = ["turn:turn.awsome.org:3478"] -# username = "awsome" -# credential = "awsome" - -# sdp semantics: -# "unified-plan" -# "plan-b" -# "unified-plan-with-fallback" -sdpsemantics = "unified-plan" -# toggle multicast dns support: https://tools.ietf.org/html/draft-mdns-ice-candidates-00 -mdns = false - -[webrtc.candidates] -# In case you're deploying ion-sfu on a server which is configured with -# a 1:1 NAT (e.g., Amazon EC2), you might want to also specify the public -# address of the machine using the setting below. This will result in -# all host candidates (which normally have a private IP address) to -# be rewritten with the public address provided in the settings. As -# such, use the option with caution and only if you know what you're doing. -# Multiple public IP addresses can be specified as a comma separated list -# if the sfu is deployed in a DMZ between two 1-1 NAT for internal and -# external users. -# nat1to1 = ["1.2.3.4"] -# icelite = true - -[webrtc.timeouts] -# The duration in [sec] without network activity before a ICE Agent is considered disconnected -disconnected = 5 -# The duration in [sec] without network activity before a ICE Agent is considered failed after disconnected -failed = 25 -# How often in [sec] the ICE Agent sends extra traffic if there is no activity, if media is flowing no traffic will be sent -keepalive = 2 - -[turn] -# Enables embeded turn server -enabled = true -# Sets the realm for turn server -realm = "uprising" -# The address the TURN server will listen on. -address = "0.0.0.0:3478" -# Certs path to config tls/dtls -# cert="path/to/cert.pem" -# key="path/to/key.pem" -# Port range that turn relays to SFU -# WARNING: It shouldn't overlap webrtc.portrange -# Format: [min, max] -# portrange = [5201, 5400] - -[turn.auth] -# Use an auth secret to generate long-term credentials defined in RFC5389-10.2 -# NOTE: This takes precedence over `credentials` if defined. -# secret = "secret" -# Sets the credentials pairs -credentials = "uprising=uprising,ion=ion123" - -[log] -# 0 - INFO 1 - DEBUG 2 - TRACE -v = 1 \ No newline at end of file From 58b11170e50173a4c76317144c5388850af30585 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Tue, 10 Jun 2025 00:17:36 -0400 Subject: [PATCH 26/36] refactor: move default ice port range to config --- volcano-sfu/src/rtc/config.rs | 10 +++++++--- volcano-sfu/src/turn.rs | 4 ---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/volcano-sfu/src/rtc/config.rs b/volcano-sfu/src/rtc/config.rs index de410b5..822cdbc 100644 --- a/volcano-sfu/src/rtc/config.rs +++ b/volcano-sfu/src/rtc/config.rs @@ -4,9 +4,13 @@ use tokio::{net::UdpSocket, sync::Mutex}; use webrtc::{api::setting_engine::SettingEngine, ice::{mdns::MulticastDnsMode, udp_mux::{UDPMuxDefault, UDPMuxParams}, udp_network::{EphemeralUDP, UDPNetwork}}, ice_transport::{ice_candidate_type::RTCIceCandidateType, ice_server::RTCIceServer}, peer_connection::{configuration::RTCConfiguration, policy::sdp_semantics::RTCSdpSemantics}, turn::auth::AuthHandler,}; use anyhow::Result; -use crate::turn::{self, TurnConfig}; +use crate::turn::TurnConfig; use crate::{buffer::factory::AtomicFactory, track::error::ConfigError}; +// 4096 port range +pub const ICE_MIN_PORT: u16 = 36864; +pub const ICE_MAX_PORT: u16 = 40959; + #[derive(Clone, Deserialize)] struct ICEServerConfig { urls: Vec, @@ -112,8 +116,8 @@ impl WebRTCTransportConfig { let mut ice_port_end: u16 = 0; if c.turn.enabled && c.turn.port_range.is_none() { - ice_port_start = turn::ICE_MIN_PORT; - ice_port_end = turn::ICE_MAX_PORT; + ice_port_start = ICE_MIN_PORT; + ice_port_end = ICE_MAX_PORT; } else if let Some(ice_port_range) = &c.webrtc.ice_port_range { if ice_port_range.len() == 2 { ice_port_start = ice_port_range[0]; diff --git a/volcano-sfu/src/turn.rs b/volcano-sfu/src/turn.rs index 4064696..d3fc94d 100644 --- a/volcano-sfu/src/turn.rs +++ b/volcano-sfu/src/turn.rs @@ -8,10 +8,6 @@ use webrtc::util::vnet::net::*; pub const TURN_MIN_PORT: u16 = 32768; pub const TURN_MAX_PORT: u16 = 36863; -// 4096 port range -pub const ICE_MIN_PORT: u16 = 36864; -pub const ICE_MAX_PORT: u16 = 40959; - #[derive(Clone, Default, Deserialize)] pub(super) struct TurnAuth { #[serde(rename = "credentials")] From f92def9ca2d004052f0f580737386386d1ec4dc8 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Tue, 10 Jun 2025 00:18:33 -0400 Subject: [PATCH 27/36] fix: send error to client on failed connection --- server/src/signaling/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index 0f7babe..c9143fb 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -152,7 +152,7 @@ impl Client { let peer = self.peer.clone(); match packet { PacketC2S::Answer { description } => peer.set_remote_description(description).await, - PacketC2S::Connect { .. } => Err(ServerError::AlreadyConnected.into()), + PacketC2S::Connect { .. } => write.send(PacketS2C::ServerError { error: ServerError::AlreadyConnected }).await, PacketC2S::Continue { .. } => { // TODO: Add Continue event Ok(()) From aabce2e0d73794f7faaedcdc2b984ac02854910b Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Tue, 10 Jun 2025 00:29:50 -0400 Subject: [PATCH 28/36] feat: send user_id in Accept --- server/src/signaling/packets.rs | 1 + server/src/signaling/server.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 332bfde..d9ae5ee 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -102,6 +102,7 @@ pub enum PacketS2C { /// Accept authentication Accept { id: u32, + user_id: String, ice_servers: Vec, available_rooms: Vec, }, diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 83bedd2..77dc1ac 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -108,6 +108,7 @@ async fn handle_connection( if let Ok(user) = (auth)(token).await { info!("Authenticated user {}", user.id); + let user_id = user.id.clone(); // Create a new client client = Some(Client::new(user, Arc::clone(&w)).await?); @@ -118,6 +119,7 @@ async fn handle_connection( write .send(PacketS2C::Accept { id, + user_id, ice_servers: w.configuration.ice_servers.clone(), available_rooms, }) From 6cc5f0c68f07c0301bdf69f8a28fb738c2f1c088 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Wed, 11 Jun 2025 11:05:43 -0400 Subject: [PATCH 29/36] feat: Calculate audio if there are streams --- volcano-sfu/src/rtc/room.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index ed68896..d04c39b 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -390,9 +390,19 @@ impl Room { tokio::spawn(async move { loop { interval.tick().await; - let observer_in = &mut observer.lock().await; + let is_empty = { + observer.lock().await.is_empty().await + }; - let streams = observer_in.calc().await; + if is_empty { + continue; + } + + let streams = { + let mut observer = observer.lock().await; + observer.calc().await + }; + if let Some(streams) = streams { info!("Streams {:?}", streams); room_out.send_message(RoomEvent::VoiceActivity { room_id: room_out.id.clone(), stream_ids: streams }).await; From 1536d113d9585de0fcf0fdb31bec8e4242069fe5 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Wed, 11 Jun 2025 11:32:25 -0400 Subject: [PATCH 30/36] feat: bump to 0.3.8 --- Cargo.lock | 2 +- server/src/signaling/server.rs | 1 + volcano-sfu/Cargo.toml | 2 +- volcano-sfu/src/rtc/config.rs | 5 +++-- volcano-sfu/src/rtc/peer.rs | 5 +++-- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3b8c0c..e9b70c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1948,7 +1948,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.7" +version = "0.3.8" dependencies = [ "anyhow", "async-trait", diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 77dc1ac..b49cc8d 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -54,6 +54,7 @@ pub async fn launch(addr: A, auth: AuthFn) -> Result<()> { // turn::init_turn_server(c.turn, c.turn_auth).await?; //} let webrtc_config = Arc::new(WebRTCTransportConfig::new(&config).await?); + info!("WebRTC configuration for SFU v{} loaded!", webrtc_config.version); // Accept new connections let auth = Arc::new(auth); while let Ok((stream, _)) = listener.accept().await { diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index 790589f..ea43d05 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.7" +version = "0.3.8" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/volcano-sfu/src/rtc/config.rs b/volcano-sfu/src/rtc/config.rs index 822cdbc..fdcf9d2 100644 --- a/volcano-sfu/src/rtc/config.rs +++ b/volcano-sfu/src/rtc/config.rs @@ -28,6 +28,7 @@ struct Candidates { } #[derive(Default)] pub struct WebRTCTransportConfig { + pub version: String, pub configuration: RTCConfiguration, pub setting: SettingEngine, pub router: RouterConfig, @@ -190,6 +191,7 @@ impl WebRTCTransportConfig { setting: se, router: c.router.clone(), factory: Arc::new(Mutex::new(AtomicFactory::new(1000, 1000))), + version: env!("CARGO_PKG_VERSION").to_string(), }; if let Some(nat1toiips) = &c.webrtc.candidates.nat1_to_1ips { @@ -203,8 +205,7 @@ impl WebRTCTransportConfig { w.setting .set_ice_multicast_dns_mode(MulticastDnsMode::Disabled); } - - info!("WebRTCTransport configuration finished"); + Ok(w) } } \ No newline at end of file diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index 7bc6ad3..908298d 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -149,11 +149,12 @@ impl Peer { ice_servers: self.config.configuration.ice_servers.clone(), ..Default::default() }; - let config = WebRTCTransportConfig { + let peer_config = WebRTCTransportConfig { configuration: rtc_config_clone, setting: self.config.setting.clone(), router: self.config.router.clone(), factory: Arc::new(Mutex::new(AtomicFactory::new(1000, 1000))), + version: self.config.version.clone(), }; if !cfg.no_subscribe { @@ -242,7 +243,7 @@ impl Peer { let closed_out_1 = self.closed.clone(); let publisher = - Arc::new(Publisher::new(self.user_id.clone(), room.clone(), config).await?); + Arc::new(Publisher::new(self.user_id.clone(), room.clone(), peer_config).await?); publisher.on_ice_candidate(Box::new(move |candidate: Option| { let on_ice_candidate_in = on_ice_candidate_out.clone(); From 2a641c47b158e008a456963580b7e553d4dd5793 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 14:17:46 -0400 Subject: [PATCH 31/36] feat: reduce container size by installing and copying --- Dockerfile | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6eaee4f..21508b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,20 +6,16 @@ WORKDIR /home/rust RUN mkdir volcano WORKDIR /home/rust/volcano COPY Cargo.toml Cargo.lock ./ - COPY volcano-sfu ./volcano-sfu - COPY server ./server # Build -#RUN cargo build --locked --release RUN cargo install --locked --path server --root /usr/local -RUN rm -r */src/*.rs target # Bundle FROM gcr.io/distroless/cc-debian12 -COPY --from=build /usr/local/bin/server ./volcano-server -COPY config.toml ./config.toml +COPY --from=build /usr/local/bin/server /etc/volcano/volcano-server +COPY server/config.toml /etc/volcano/config.toml # Signaling server port EXPOSE 4000/tcp @@ -29,4 +25,4 @@ EXPOSE 4000/tcp ENV RUST_LOG=debug -CMD ["./volcano-server"] \ No newline at end of file +CMD ["./etc/volcano/volcano-server"] \ No newline at end of file From bcaca9dce284caf05a9d5be12c1f4e5721883384 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 16:22:18 -0400 Subject: [PATCH 32/36] feat: add security check --- .github/workflows/build.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/build.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..d61423d --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,24 @@ +name: build +on: + pull_request: + branches: + - main + - dev +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Build an image from Dockerfile + run: docker build -t docker.io/account0123/volcano:${{ github.sha }} . + - name: Run Trivy vulnerability scanner + uses: aquasecurity/trivy-action@0.28.0 + with: + image-ref: 'docker.io/account0123/volcano:${{ github.sha }}' + format: 'table' + exit-code: '1' + ignore-unfixed: true + vuln-type: 'os,library' + severity: 'CRITICAL,HIGH' \ No newline at end of file From 7e3407ddf5a8b70bcff4aaf8b884fd0a61aa3f85 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 16:25:28 -0400 Subject: [PATCH 33/36] feat: add cli option for locating config path --- Cargo.lock | 122 ++++++++++++++++++++++++++++++++- Cargo.toml | 2 +- server/Cargo.toml | 3 +- server/src/main.rs | 20 +++++- server/src/signaling/server.rs | 19 ++--- 5 files changed, 148 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9b70c7..a2184fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,55 @@ dependencies = [ "memchr", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.82" @@ -271,6 +320,52 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "const-oid" version = "0.9.6" @@ -689,6 +784,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -809,6 +910,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.11" @@ -1490,6 +1597,7 @@ version = "0.1.2" dependencies = [ "anyhow", "async-trait", + "clap", "futures", "log", "postage", @@ -1600,6 +1708,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "stun" version = "0.8.0" @@ -1931,6 +2045,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.8.0" @@ -1948,7 +2068,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.8" +version = "0.3.9" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 93bf034..ed69efe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] members = ["volcano-sfu", "server"] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/server/Cargo.toml b/server/Cargo.toml index cb3e458..ebe4545 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,4 +28,5 @@ webrtc = "0.13.0" anyhow = "1.0.82" postage = "0.5.0" -volcano-sfu = {path = "../volcano-sfu"} \ No newline at end of file +volcano-sfu = {path = "../volcano-sfu"} +clap = { version = "4.5.40", features = ["derive"] } diff --git a/server/src/main.rs b/server/src/main.rs index 74ffdb9..9962007 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,3 +1,8 @@ +use std::fs; + +use clap::Parser; +use volcano_sfu::rtc::config; + #[macro_use] extern crate log; #[macro_use] @@ -5,10 +10,21 @@ extern crate serde; pub mod signaling; +#[derive(clap::Parser)] +struct Cli { + #[arg(short = 'c', long = "config", default_value = "./config.toml")] + config_path: String, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { pretty_env_logger::init_timed(); - signaling::server::launch("0.0.0.0:4000", Box::new(move |token| { + let cli = Cli::parse(); + let content = fs::read_to_string(cli.config_path).inspect_err(|e| error!("Error loading config file: {e}")).unwrap_or_default(); + let config = config::load(&content) + .inspect_err(|e| error!("Error loading config: {e}. Loading default config.")) + .unwrap_or_default(); + signaling::server::launch("0.0.0.0:4000", config, Box::new(move |token| { Box::pin(async move { use signaling::server::{UserCapabilities, UserInformation}; @@ -24,4 +40,4 @@ async fn main() -> anyhow::Result<()> { }) }) })).await -} +} \ No newline at end of file diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index b49cc8d..073de8d 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -1,13 +1,11 @@ use anyhow::Result; use futures::{Future, StreamExt}; -use std::{fs, pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; -use volcano_sfu::{ - rtc::{ - config::{self, WebRTCTransportConfig}, +use volcano_sfu::rtc::{ + config::{Config, WebRTCTransportConfig}, room::Room, - }, -}; + }; use super::{ client::Client, @@ -38,18 +36,13 @@ type AuthFn = Box< >; /// Launch a new signaling server -pub async fn launch(addr: A, auth: AuthFn) -> Result<()> { +pub async fn launch(addr: A, config: Config, auth: AuthFn) -> Result<()> { // Create TCP listener let try_socket = TcpListener::bind(addr).await; let listener = try_socket.expect("Failed to bind"); info!("Server listening on {}", listener.local_addr().unwrap()); - - let content = fs::read_to_string("./config.toml")?; - let c = config::load(&content) - .inspect_err(|e| error!("Error loading config: {e}. Loading default config.")) - .unwrap_or_default(); - let config = c.clone(); + //if c.turn.enabled { // turn::init_turn_server(c.turn, c.turn_auth).await?; //} From 620a7713aed9e73ad140e8dff7ccfceed0ecb827 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 16:27:08 -0400 Subject: [PATCH 34/36] fix: audio observer works by publisher/router instead working by room --- volcano-sfu/Cargo.toml | 2 +- volcano-sfu/src/rtc/peer/publisher.rs | 10 + volcano-sfu/src/rtc/room.rs | 39 +-- volcano-sfu/src/track/audio_observer.rs | 4 +- volcano-sfu/src/track/downtrack.rs | 6 +- volcano-sfu/src/track/receiver.rs | 20 +- volcano-sfu/src/track/router.rs | 341 ++++++++++++++++-------- 7 files changed, 259 insertions(+), 163 deletions(-) diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index ea43d05..c17c3f9 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.8" +version = "0.3.9" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/volcano-sfu/src/rtc/peer/publisher.rs b/volcano-sfu/src/rtc/peer/publisher.rs index df35be5..5f18725 100644 --- a/volcano-sfu/src/rtc/peer/publisher.rs +++ b/volcano-sfu/src/rtc/peer/publisher.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::pin::Pin; use std::future::Future; use std::sync::Arc; @@ -121,6 +122,15 @@ impl Publisher { } pub async fn close(&self) { + let observer = self.room.audio_observer.lock().await; + + // Remove publisher streams from audio observer + let tracks = &*self.tracks.lock().await; + let stream_ids: BTreeSet = tracks.iter().map(|t| t.track.stream_id()).collect(); + for stream_id in &stream_ids { + observer.remove_stream(stream_id).await; + } + self.router.stop().await; if let Err(err) = self.pc.close().await { error!("close err: {}", err); diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index d04c39b..bdc2e0e 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -8,7 +8,7 @@ use postage::{ broadcast::{channel, Receiver, Sender}, sink::Sink, }; -use tokio::{sync::Mutex, time::{interval, Duration}}; +use tokio::sync::Mutex; use ulid::Ulid; use webrtc::{ data::data_channel::DataChannel, data_channel::{ @@ -382,36 +382,6 @@ impl Room { return RoomInfo { id: self.id.clone(), users }; } - async fn start_audio_observer_task(self: &Arc) { - let observer = self.audio_observer.clone(); - let interval_ms = observer.lock().await.interval as u64; - let mut interval = interval(Duration::from_millis(interval_ms)); - let room_out = self.clone(); - tokio::spawn(async move { - loop { - interval.tick().await; - let is_empty = { - observer.lock().await.is_empty().await - }; - - if is_empty { - continue; - } - - let streams = { - let mut observer = observer.lock().await; - observer.calc().await - }; - - if let Some(streams) = streams { - info!("Streams {:?}", streams); - room_out.send_message(RoomEvent::VoiceActivity { room_id: room_out.id.clone(), stream_ids: streams }).await; - } - } - }); - info!("Audio observer task started"); - } - pub async fn subscribe(self: &Arc, peer: Arc) { info!("Subscribing a new peer"); @@ -435,6 +405,10 @@ impl Room { } } + if let Some(publisher) = peer.publisher().await { + publisher.router().start_audio_observer_task().await; + } + for cur_peer in self.peers.iter() { let cur_id = cur_peer.id(); let peer_id = peer.id(); @@ -466,9 +440,6 @@ impl Room { // Offer API data channel to client subscriber self.add_api_channel(&peer.id()).await; - - // Start audio observer task - self.start_audio_observer_task().await; } /// Remove a user from the room pub async fn remove_user(&self, id: &str) { diff --git a/volcano-sfu/src/track/audio_observer.rs b/volcano-sfu/src/track/audio_observer.rs index d579b00..b3dddc6 100644 --- a/volcano-sfu/src/track/audio_observer.rs +++ b/volcano-sfu/src/track/audio_observer.rs @@ -49,7 +49,7 @@ impl AudioObserver { }) } - pub async fn remove_stream(&mut self, stream_id: &str) { + pub async fn remove_stream(&self, stream_id: &str) { debug!("Remove stream {}", stream_id); let mut streams = self.streams.lock().await; streams.retain(|stream| !stream.id.eq(stream_id)); @@ -91,6 +91,8 @@ impl AudioObserver { if stream.total >= self.expected { debug!("[stream {}] {}/{} (acceptable)", stream.id, stream.total, self.expected); stream_ids.push(stream.id.clone()); + } else { + debug!("[stream {}] {}/{} (not acceptable)", stream.id, stream.total, self.expected); } stream.total = 0; diff --git a/volcano-sfu/src/track/downtrack.rs b/volcano-sfu/src/track/downtrack.rs index 170c214..59b9293 100644 --- a/volcano-sfu/src/track/downtrack.rs +++ b/volcano-sfu/src/track/downtrack.rs @@ -223,7 +223,7 @@ impl DownTrackInternal { } if !fwd_pkts.is_empty() { - if let Err(err) = receiver.send_rtcp(fwd_pkts) { + if let Err(err) = receiver.send_rtcp(fwd_pkts).await { log::error!("send_rtcp err:{}", err); } } @@ -658,7 +658,7 @@ impl DownTrack { receiver.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc: ssrc, media_ssrc, - })])?; + })]).await?; return Ok(()); } @@ -742,7 +742,7 @@ impl DownTrack { receiver.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc: ssrc, media_ssrc: ext_packet.packet.header.ssrc, - })])?; + })]).await?; return Ok(()); } diff --git a/volcano-sfu/src/track/receiver.rs b/volcano-sfu/src/track/receiver.rs index 1fa965e..9235042 100644 --- a/volcano-sfu/src/track/receiver.rs +++ b/volcano-sfu/src/track/receiver.rs @@ -7,7 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication; use webrtc::rtp::packet::Packet as RTCPacket; @@ -26,8 +26,8 @@ use super::error::{Error, Result}; use super::sequencer::PacketMeta; use super::{modify_vp8_temporal_payload, simulcast}; -pub type RtcpDataReceiver = UnboundedReceiver>>; -pub type RtcpDataSender = UnboundedSender>>; +pub type RtcpDataReceiver = mpsc::Receiver>>; +pub type RtcpDataSender = mpsc::Sender>>; pub type OnCloseHandlerFn = Box Pin + Send + 'static>>) + Send + Sync>; @@ -55,10 +55,10 @@ pub trait Receiver: Send + Sync { -> Result<()>; async fn delete_down_track(&self, layer: usize, id: String); async fn register_on_close(&self, f: OnCloseHandlerFn); - fn send_rtcp(&self, p: Vec>) -> Result<()>; + async fn send_rtcp(&self, p: Vec>) -> Result<()>; fn set_rtcp_channel( &mut self, - sender: Arc>>>, + sender: Arc>>>, ); async fn get_sender_report_time(&self, layer: usize) -> (u32, u64); fn as_any(&self) -> &(dyn Any + Send + Sync); @@ -93,7 +93,7 @@ pub struct WebRTCReceiver { impl WebRTCReceiver { pub async fn new(receiver: Arc, track: Arc, pid: String) -> Self { - let (s, _) = tokio::sync::mpsc::unbounded_channel(); + let (s, _) = tokio::sync::mpsc::channel(1024); Self { peer_id: pid, receiver, @@ -322,7 +322,7 @@ impl Receiver for WebRTCReceiver { *handler = Some(f); } - fn send_rtcp(&self, p: Vec>) -> Result<()> { + async fn send_rtcp(&self, p: Vec>) -> Result<()> { // Checks if first packet is PLI if let Some(packet) = p.get(0) { if packet.as_any().downcast_ref::().is_some() { @@ -342,7 +342,7 @@ impl Receiver for WebRTCReceiver { } } - if self.rtcp_sender.send(p).is_err() { + if self.rtcp_sender.send(p).await.is_err() { return Err(Error::ErrChannelSend); } @@ -351,7 +351,7 @@ impl Receiver for WebRTCReceiver { fn set_rtcp_channel( &mut self, - sender: Arc>>>, + sender: Arc>>>, ) { self.rtcp_sender = sender; } @@ -458,7 +458,7 @@ impl Receiver for WebRTCReceiver { self.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc, media_ssrc, - })])?; + })]).await?; } } diff --git a/volcano-sfu/src/track/router.rs b/volcano-sfu/src/track/router.rs index 3c2a50c..d67c746 100644 --- a/volcano-sfu/src/track/router.rs +++ b/volcano-sfu/src/track/router.rs @@ -1,25 +1,28 @@ -use std::{pin::Pin, sync::Arc}; use std::future::Future; +use std::{pin::Pin, sync::Arc}; use dashmap::DashMap; -use tokio::sync::mpsc; -use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, Mutex}; +use tokio::sync::broadcast::Sender; +use tokio::sync::{broadcast, mpsc}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; +use webrtc::error::Error as RTCError; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; use webrtc::rtcp::packet::Packet as RtcpPacket; -use webrtc::error::Error as RTCError; use webrtc::rtp_transceiver::rtp_codec::{RTCRtpCodecCapability, RTPCodecType}; use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver; use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; use webrtc::rtp_transceiver::{RTCPFeedback, RTCRtpTransceiverInit}; use webrtc::track::track_remote::TrackRemote; -use crate::rtc::peer::subscriber::Subscriber; -use crate::rtc::room::Room; -use crate::{buffer::factory::AtomicFactory, buffer::buffer::BufferIO, rtc::config::RouterConfig}; -use crate::buffer::buffer::Options as BufferOptions; use super::downtrack::{DownTrack, DownTrackInternal}; use super::error::Result; use super::receiver::{Receiver, RtcpDataReceiver, RtcpDataSender, WebRTCReceiver}; +use crate::buffer::buffer::Options as BufferOptions; +use crate::rtc::peer::subscriber::Subscriber; +use crate::rtc::room::{Room, RoomEvent}; +use crate::track::audio_observer::AudioObserver; +use crate::{buffer::buffer::BufferIO, buffer::factory::AtomicFactory, rtc::config::RouterConfig}; pub type RtcpWriterFn = Box< dyn (FnMut( @@ -44,11 +47,11 @@ pub type OnDelReciverTrackFn = Box< >; pub struct LocalRouter { id: String, + audio_observer: Arc>, //twcc: Arc>>, rtcp_sender_channel: Arc, rtcp_receiver_channel: Arc>, - stop_sender_channel: Arc>>, - stop_receiver_channel: Arc>>, + stop_sender_channel: Arc>, config: RouterConfig, receivers: Arc>>>, buffer_factory: AtomicFactory, @@ -60,16 +63,20 @@ pub struct LocalRouter { impl LocalRouter { pub fn new(id: String, room: Arc, config: RouterConfig) -> Self { - let (s, r) = mpsc::unbounded_channel(); - let (sender, receiver) = mpsc::unbounded_channel(); + let (s, r) = mpsc::channel(1024); + let (sender, _) = broadcast::channel(1); + let audio_threshold = config.audio_level_threshold; + let audio_interval = config.audio_level_interval; + let audio_filter = config.audio_level_filter; + let audio_observer = AudioObserver::new(audio_threshold, audio_interval, audio_filter); Self { id, //twcc: Arc::new(Mutex::new(None)), //stats: Arc::new(Mutex::new(HashMap::new())), + audio_observer: Arc::new(Mutex::new(audio_observer)), rtcp_sender_channel: Arc::new(s), rtcp_receiver_channel: Arc::new(Mutex::new(r)), - stop_sender_channel: Arc::new(Mutex::new(sender)), - stop_receiver_channel: Arc::new(Mutex::new(receiver)), + stop_sender_channel: Arc::new(sender), config, receivers: Arc::new(Mutex::new(DashMap::new())), room, @@ -80,7 +87,7 @@ impl LocalRouter { } } - /* + /* fn get_receivers(&self) -> Arc>>> { self.receivers.clone() }*/ @@ -98,7 +105,11 @@ impl LocalRouter { self.id.clone() } - pub async fn add_down_track(&self, subscriber: Arc, receiver: Arc) -> Result>> { + pub async fn add_down_track( + &self, + subscriber: Arc, + receiver: Arc, + ) -> Result>> { let downtracks = subscriber.get_tracks(&receiver.stream_id()).await; // Checks for available tracks if let Some(downtracks_data) = downtracks { @@ -112,7 +123,11 @@ impl LocalRouter { } let codec = receiver.codec(); - subscriber.m.lock().await.register_codec(codec.clone(), receiver.kind())?; + subscriber + .m + .lock() + .await + .register_codec(codec.clone(), receiver.kind())?; let codec_capability = RTCRtpCodecCapability { mime_type: codec.capability.mime_type, clock_rate: codec.capability.clock_rate, @@ -135,14 +150,22 @@ impl LocalRouter { }; // New local down track - let down_track_local = Arc::new(DownTrackInternal::new(codec_capability, receiver.clone(), self.config.max_packet_track).await); - let transceiver = - subscriber.pc.add_transceiver_from_track( + let down_track_local = Arc::new( + DownTrackInternal::new( + codec_capability, + receiver.clone(), + self.config.max_packet_track, + ) + .await, + ); + let transceiver = subscriber + .pc + .add_transceiver_from_track( down_track_local.clone(), Some(RTCRtpTransceiverInit { direction: RTCRtpTransceiverDirection::Sendonly, send_encodings: Vec::new(), - }) + }), ) .await?; // New local track @@ -154,65 +177,77 @@ impl LocalRouter { let s_out = subscriber.clone(); let r_out = receiver.clone(); let down_track_out = down_track_arc.clone(); - down_track_arc.register_on_close(Box::new(move || { - let s_in = s_out.clone(); - let r_in = r_out.clone(); - let transceiver_in = transceiver.clone(); - let down_track_in = down_track_out.clone(); - Box::pin(async move { - if s_in.pc.connection_state() != RTCPeerConnectionState::Closed { - // Remove track from subscriber peer connection - let rv = s_in - .pc - .remove_track(&transceiver_in.sender().await) - .await; - match rv { - Ok(_) => { - info!("Remove DownTrack for {}", &r_in.stream_id()); - s_in.remove_down_track(&r_in.stream_id(), &down_track_in) - .await; - info!("RemoveDownTrack Negotiate"); - if let Err(err) = s_in.negotiate(None).await { - error!("negotiate err:{} ", err); + down_track_arc + .register_on_close(Box::new(move || { + let s_in = s_out.clone(); + let r_in = r_out.clone(); + let transceiver_in = transceiver.clone(); + let down_track_in = down_track_out.clone(); + Box::pin(async move { + if s_in.pc.connection_state() != RTCPeerConnectionState::Closed { + // Remove track from subscriber peer connection + let rv = s_in.pc.remove_track(&transceiver_in.sender().await).await; + match rv { + Ok(_) => { + info!("Remove DownTrack for {}", &r_in.stream_id()); + s_in.remove_down_track(&r_in.stream_id(), &down_track_in) + .await; + info!("RemoveDownTrack Negotiate"); + if let Err(err) = s_in.negotiate(None).await { + error!("negotiate err:{} ", err); + } } - } - Err(err) => { - if err == RTCError::ErrConnectionClosed { - // return; + Err(err) => { + if err == RTCError::ErrConnectionClosed { + // return; + } } } } - } - })} - )).await; + }) + })) + .await; let s_out_1 = subscriber.clone(); let r_out_1 = receiver.clone(); - down_track_arc.register_on_bind(Box::new(move || { - let s_in = s_out_1.clone(); - let r_in = r_out_1.clone(); + down_track_arc + .register_on_bind(Box::new(move || { + let s_in = s_out_1.clone(); + let r_in = r_out_1.clone(); - Box::pin(async move { - tokio::spawn(async move { - s_in.send_stream_down_track_reports(&r_in.stream_id()).await; - }); - }) - })).await; + Box::pin(async move { + tokio::spawn(async move { + s_in.send_stream_down_track_reports(&r_in.stream_id()).await; + }); + }) + })) + .await; - subscriber.add_down_track(receiver.stream_id(), down_track_arc.clone()).await; - receiver.add_down_track(down_track_arc, self.config.simulcast.best_quality_first).await; + subscriber + .add_down_track(receiver.stream_id(), down_track_arc.clone()) + .await; + receiver + .add_down_track(down_track_arc, self.config.simulcast.best_quality_first) + .await; Ok(None) } - pub async fn add_down_tracks(&self, subscriber: Arc, r: Option>,) -> Result<()> { + pub async fn add_down_tracks( + &self, + subscriber: Arc, + r: Option>, + ) -> Result<()> { if subscriber.no_auto_subscribe { info!("Skipping no auto subscribe"); return Ok(()); } if let Some(receiver) = r { - info!("Add actual downtrack to subscriber, subscriber: {}", subscriber.id); + info!( + "Add actual downtrack to subscriber, subscriber: {}", + subscriber.id + ); self.add_down_track(subscriber.clone(), receiver).await?; subscriber.negotiate(None).await?; return Ok(()); @@ -237,42 +272,72 @@ impl LocalRouter { Ok(()) } - pub async fn add_receiver(&self, receiver: Arc, track: Arc, track_id: String, stream_id: String) -> (Arc, bool) { - info!("add_receiver -> track {}, stream: {}", track.id(), stream_id); + pub async fn add_receiver( + self: &Arc, + receiver: Arc, + track: Arc, + track_id: String, + stream_id: String, + ) -> (Arc, bool) { + info!( + "add_receiver -> track {}, stream: {}", + track.id(), + stream_id + ); let mut published = false; let buffer = self.buffer_factory.get_or_new_buffer(track.ssrc()).await; let sender = self.rtcp_sender_channel.clone(); - buffer.register_on_feedback(Box::new(move |packets: Vec>| { - let sender_in = Arc::clone(&sender); - Box::pin(async move { - if let Err(err) = sender_in.send(packets) { - error!("send err: {}", err); - } - }) - })).await; + buffer + .register_on_feedback(Box::new( + move |packets: Vec>| { + let sender_in = Arc::clone(&sender); + Box::pin(async move { + if let Err(err) = sender_in.send(packets).await { + error!("send err: {}", err); + } + }) + }, + )) + .await; match track.kind() { RTPCodecType::Audio => { - let room_out = self.room.clone(); + let router_out = self.clone(); let stream_id_out = stream_id.clone(); - buffer.register_on_audio_level(Box::new(move |voice, level| { - let room_in = room_out.clone(); - let stream_id_in = stream_id_out.clone(); - Box::pin(async move { - if !voice { - debug!("Skip observation"); - return; - } - room_in.audio_observer.lock().await.observe(&stream_id_in, level).await; - }) - })).await; - debug!("[Room {}] add stream {} to audio observer", self.room.id, stream_id); - self.room.audio_observer.lock().await.add_stream(stream_id).await; - }, + buffer + .register_on_audio_level(Box::new(move |voice, level| { + let router_in = router_out.clone(); + let stream_id_in = stream_id_out.clone(); + Box::pin(async move { + if !voice { + debug!("Skip observation"); + return; + } + router_in + .audio_observer + .lock() + .await + .observe(&stream_id_in, level) + .await; + }) + })) + .await; + debug!( + "[Room {}] add stream {} to audio observer", + self.room.id, stream_id + ); + let router_2 = self.clone(); + router_2 + .audio_observer + .lock() + .await + .add_stream(stream_id) + .await; + } RTPCodecType::Video => { debug!("Video tracking not implemented"); - }, - _ => {}, + } + _ => {} } // TODO: implement twcc @@ -285,13 +350,16 @@ impl LocalRouter { //let stats_out = Arc::clone(&self.stats); let buffer_out = Arc::clone(&buffer); let with_status = self.config.with_stats; - rtcp_reader.lock().await.register_on_packet(Box::new(move |packet: Vec| { - let buffer_in = Arc::clone(&buffer_out); - Box::pin(async move { - let mut buf = &packet[..]; - let pkts_result = webrtc::rtcp::packet::unmarshal(&mut buf)?; - for pkt in pkts_result { - if let Some(description) = + rtcp_reader + .lock() + .await + .register_on_packet(Box::new(move |packet: Vec| { + let buffer_in = Arc::clone(&buffer_out); + Box::pin(async move { + let mut buf = &packet[..]; + let pkts_result = webrtc::rtcp::packet::unmarshal(&mut buf)?; + for pkt in pkts_result { + if let Some(description) = pkt.as_any() .downcast_ref::() { @@ -313,16 +381,18 @@ impl LocalRouter { // TODO: update stats } } - } - Ok(()) - }) - })).await; + } + Ok(()) + }) + })) + .await; let receivers = self.receivers.lock().await; let result_receiver; match receivers.get(&track.id()) { - Some(r)=>result_receiver = r.clone(), - None=>{ - let mut rv = WebRTCReceiver::new(receiver.clone(), track.clone(), self.id.clone()).await; + Some(r) => result_receiver = r.clone(), + None => { + let mut rv = + WebRTCReceiver::new(receiver.clone(), track.clone(), self.id.clone()).await; rv.set_rtcp_channel(self.rtcp_sender_channel.clone()); let recv_kind = rv.kind(); let room_out = self.room.clone(); @@ -332,13 +402,16 @@ impl LocalRouter { let stream_id_in = stream_id.clone(); Box::pin(async move { if recv_kind == RTPCodecType::Audio { - room_in.audio_observer - .lock().await - .remove_stream(&stream_id_in).await; - } + room_in + .audio_observer + .lock() + .await + .remove_stream(&stream_id_in) + .await; } - ) - })).await; + }) + })) + .await; result_receiver = Arc::new(rv); receivers.insert(track_id, result_receiver.clone()); published = true; @@ -372,7 +445,7 @@ impl LocalRouter { let buffer_clone = buffer.clone(); tokio::spawn(async move { let mut b = vec![0u8; 1500]; - + while let Ok((pkt, _)) = track.read(&mut b).await { if let Err(err) = buffer_clone.write(pkt).await { error!("write error: {}", err); @@ -384,14 +457,54 @@ impl LocalRouter { (result_receiver, published) } + pub async fn start_audio_observer_task(&self) { + let mut stop_receiver = self.stop_sender_channel.subscribe(); + let id_out = self.id.clone(); + let observer = self.audio_observer.clone(); + let interval_ms = observer.lock().await.interval as u64; + let interval = Duration::from_millis(interval_ms); + let room_out = self.room.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = sleep(interval) => { + let is_empty = { + observer.lock().await.is_empty().await + }; + + if is_empty { + info!("Continue"); + continue; + } + + let streams = { + observer.lock().await.calc().await + }; + + if let Some(streams) = streams { + info!("Streams {:?}", streams); + room_out.send_message(RoomEvent::VoiceActivity { room_id: room_out.id.clone(), stream_ids: streams }).await; + } + } + _ = stop_receiver.recv() => { + info!("[Router {}] Stopping audio observer task", id_out); + break; + } + } + } + }); + info!("Audio observer task started"); + } + pub async fn set_rtcp_writer(&self, writer: RtcpWriterFn) { let mut handler = self.rtcp_writer_handler.lock().await; *handler = Some(writer); } pub async fn send_rtcp(&self) { + let mut stop = self.stop_sender_channel.subscribe(); + let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; loop { - let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; - let mut stop_receiver = self.stop_receiver_channel.lock().await; + //let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; tokio::select! { data = rtcp_receiver.recv() => { if let Some(val) = data{ @@ -400,7 +513,7 @@ impl LocalRouter { } } } - _data = stop_receiver.recv() => { + _data = stop.recv() => { info!("Stop receiver signal. Exiting loop"); return ; } @@ -408,8 +521,8 @@ impl LocalRouter { } } pub async fn stop(&self) { - if let Err(err) = self.stop_sender_channel.lock().await.send(()) { + if let Err(err) = self.stop_sender_channel.send(()) { error!("stop err: {}", err); } } -} \ No newline at end of file +} From c15f3e0a369ee1307c6495c3864b7318a1607228 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 16:47:03 -0400 Subject: [PATCH 35/36] fix: fix current config.toml path --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 21508b5..5a4e45a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ RUN cargo install --locked --path server --root /usr/local # Bundle FROM gcr.io/distroless/cc-debian12 COPY --from=build /usr/local/bin/server /etc/volcano/volcano-server -COPY server/config.toml /etc/volcano/config.toml +COPY config.toml /etc/volcano/config.toml # Signaling server port EXPOSE 4000/tcp From e80f0ab0bcb1b6d791eb1c0de7c5bc255650dd06 Mon Sep 17 00:00:00 2001 From: Ian Parra Date: Thu, 19 Jun 2025 16:49:12 -0400 Subject: [PATCH 36/36] fix: use config example for test environment --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 5a4e45a..cf14092 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ RUN cargo install --locked --path server --root /usr/local # Bundle FROM gcr.io/distroless/cc-debian12 COPY --from=build /usr/local/bin/server /etc/volcano/volcano-server -COPY config.toml /etc/volcano/config.toml +COPY config.example.toml /etc/volcano/config.toml # Signaling server port EXPOSE 4000/tcp