diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 46e2519..43004ec 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -16,23 +16,23 @@ jobs: matrix: os: [ubuntu-latest, windows-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v5 - name: Install alsa dev - if: runner.os == 'Linux' + if: runner.os == 'Linux' run: | sudo apt-get update sudo apt-get install libasound2-dev - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Build run: cargo build -p kissmp-bridge --verbose --release - name: Store Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: build_results_bridge + name: build_results_bridge-${{ matrix.os }} path: | ./target/release/kissmp-bridge ./target/release/kissmp-bridge.exe @@ -41,19 +41,19 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-20.04, windows-latest] + os: [ubuntu-latest, windows-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v5 - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Build run: cargo build -p kissmp-server --verbose --release - name: Store Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: build_results_server + name: build_results_server-${{ matrix.os }} path: | ./target/release/kissmp-server ./target/release/kissmp-server.exe @@ -62,17 +62,17 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-20.04] + os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v5 - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Build run: cargo build -p kissmp-master --verbose --release - name: Store Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: build_results_master_server path: | diff --git a/Cargo.toml b/Cargo.toml index 6ed1544..2a8d691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,7 @@ default-members = [ "kissmp-bridge", "kissmp-server" ] +[workspace.package] +version = "0.6.0" +authors = ["hellbox"] +edition = "2018" \ No newline at end of file diff --git a/KISSMultiplayer/lua/ge/extensions/kissmp/ui/names.lua b/KISSMultiplayer/lua/ge/extensions/kissmp/ui/names.lua index be5dd11..63cdc40 100644 --- a/KISSMultiplayer/lua/ge/extensions/kissmp/ui/names.lua +++ b/KISSMultiplayer/lua/ge/extensions/kissmp/ui/names.lua @@ -25,7 +25,9 @@ local function draw() ColorF(1, 1, 1, 1), true, false, - ColorI(0, 0, 0, 255) + ColorI(0, 0, 0, 255), + false, + false ) end end diff --git a/KISSMultiplayer/lua/ge/extensions/kissmp/ui/tabs/server_list.lua b/KISSMultiplayer/lua/ge/extensions/kissmp/ui/tabs/server_list.lua index 6b2921c..d25b0ac 100644 --- a/KISSMultiplayer/lua/ge/extensions/kissmp/ui/tabs/server_list.lua +++ b/KISSMultiplayer/lua/ge/extensions/kissmp/ui/tabs/server_list.lua @@ -1,7 +1,7 @@ local M = {} local imgui = ui_imgui local http = require("socket.http") -local VERSION_PRTL = "0.6.0" +local VERSION_PRTL = "0.7.0" local filter_servers_notfull = imgui.BoolPtr(false) local filter_servers_notempty = imgui.BoolPtr(false) diff --git a/KISSMultiplayer/lua/ge/extensions/network.lua b/KISSMultiplayer/lua/ge/extensions/network.lua index 9024bec..f41c3d8 100644 --- a/KISSMultiplayer/lua/ge/extensions/network.lua +++ b/KISSMultiplayer/lua/ge/extensions/network.lua @@ -1,6 +1,6 @@ local M = {} -M.VERSION_STR = "0.6.0" +M.VERSION_STR = "0.7.0" M.downloads = {} M.downloading = false @@ -28,6 +28,7 @@ M.connection = { } local FILE_TRANSFER_CHUNK_SIZE = 16384; +local CHUNK_SIZE = 65000 -- Safe size under 65536 limit local message_handlers = {} @@ -179,16 +180,45 @@ local function send_data(raw_data, reliable) print("NOT IMPLEMENTED. PLEASE REPORT TO KISSMP DEVELOPERS. CODE: "..raw_data) return end + if not M.connection.connected then return -1 end local data = "" - -- Used in context of it being called from vehicle lua, where it's already encoded into json if type(raw_data) == "string" then data = raw_data else data = jsonEncode(raw_data) end - if not M.connection.connected then return -1 end - local len = #data - local len = ffi.string(ffi.new("uint32_t[?]", 1, {len}), 4) + local data_size = #data + -- Auto-chunk if data is too large + if data_size > CHUNK_SIZE then + print("Large data detected: " .. data_size .. " bytes, sending in chunks") + local num_chunks = math.ceil(data_size / CHUNK_SIZE) + + for i = 0, num_chunks - 1 do + local start_pos = i * CHUNK_SIZE + 1 + local end_pos = math.min((i + 1) * CHUNK_SIZE, data_size) + local chunk = data:sub(start_pos, end_pos) + + local chunk_data = jsonEncode({ + DataChunk = { + chunk_index = i, + total_chunks = num_chunks, + data = chunk + } + }) + + local len = ffi.string(ffi.new("uint32_t[?]", 1, {#chunk_data}), 4) + M.connection.tcp:send(string.char(1)..len) + M.connection.tcp:send(chunk_data) + + print("Sent chunk " .. (i + 1) .. "/" .. num_chunks) + end + + print("All chunks sent successfully") + return 0 + end + + -- Send normally + local len = ffi.string(ffi.new("uint32_t[?]", 1, {data_size}), 4) if reliable then reliable = 1 else @@ -196,6 +226,7 @@ local function send_data(raw_data, reliable) end M.connection.tcp:send(string.char(reliable)..len) M.connection.tcp:send(data) + return 0 end local function sanitize_addr(addr) @@ -277,7 +308,7 @@ local function connect(addr, player_name) local steamid64 = nil if Steam and Steam.isWorking then - steamid64 = Steam.getAccountIDStr() ~= "0" and Steam.getAccountIDStr() or nil + steamid64 = Steam.accountID ~= "0" and Steam.accountID or nil end local client_info = { @@ -285,7 +316,7 @@ local function connect(addr, player_name) name = player_name, secret = generate_secret(server_info.server_identifier), steamid64 = steamid64, - client_version = {0, 6} + client_version = {0, 7} } } send_data(client_info, true) diff --git a/kissmp-bridge/Cargo.toml b/kissmp-bridge/Cargo.toml index 3d152ab..1be2a94 100644 --- a/kissmp-bridge/Cargo.toml +++ b/kissmp-bridge/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "kissmp-bridge" -version = "0.6.0" -authors = ["hellbox"] -edition = "2018" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] @@ -32,3 +32,4 @@ cpal = "0.13" fon = "0.5.0" log = "0.4" indoc = "1.0" +rcgen = "0.8.2" diff --git a/kissmp-bridge/src/main.rs b/kissmp-bridge/src/main.rs index 8d5841e..b5e5f9d 100644 --- a/kissmp-bridge/src/main.rs +++ b/kissmp-bridge/src/main.rs @@ -92,32 +92,123 @@ async fn connect_to_server( discord_tx: std::sync::mpsc::Sender, ) -> () { let endpoint = { - let rustls_config = rustls::ClientConfig::builder() + // Generate certificate first + let cert = rcgen::generate_simple_self_signed(vec!["kissmp".into()]).unwrap(); + let key = rustls::PrivateKey(cert.serialize_private_key_der()); + let cert = rustls::Certificate(cert.serialize_der().unwrap()); + + // Create crypto config with client auth + let mut crypto = rustls::ClientConfig::builder() .with_safe_defaults() .with_custom_certificate_verifier(Arc::new(AcceptAnyCertificate)) - .with_no_client_auth(); - let mut client_cfg = quinn::ClientConfig::new(Arc::new(rustls_config)); + .with_client_cert_resolver(Arc::new(ClientCertResolver { + cert: cert.clone(), + key: key.clone(), + })); + crypto.alpn_protocols = vec![b"kissmp".to_vec()]; + crypto.enable_early_data = true; + let mut client_cfg = quinn::ClientConfig::new(Arc::new(crypto)); + let mut transport = quinn::TransportConfig::default(); transport.max_idle_timeout(Some(IdleTimeout::try_from(SERVER_IDLE_TIMEOUT).unwrap())); - client_cfg.transport = std::sync::Arc::new(transport); + transport.keep_alive_interval(Some(std::time::Duration::from_secs(2))); + client_cfg.transport = Arc::new(transport); - let mut endpoint = quinn::Endpoint::client(addr).unwrap(); + let mut endpoint = quinn::Endpoint::client( + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + ).unwrap(); endpoint.set_default_client_config(client_cfg); endpoint }; - let server_connection = match endpoint.connect(addr, "kissmp").unwrap().await { - Ok(c) => c, + info!("Attempting to connect to the server at {}", addr); + let mut server_connection = match endpoint.connect(addr, "kissmp").unwrap().await { + Ok(c) => { + info!("Successfully connected to the server at {}", addr); + c + } + Err(e) => { + error!("Failed to connect to the server at {}: {}", addr, e); + return; + } + }; + + // Send initial client info to establish connection + let client_info = shared::ClientCommand::ClientInfo(shared::ClientInfoPrivate { + name: "Bridge Client".to_string(), + client_version: shared::VERSION, + secret: String::from("bridge"), + steamid64: None, + }); + + let client_info_data = bincode::serialize(&client_info).unwrap(); + + // Send client info through reliable stream + let mut send_stream = match server_connection.connection.open_uni().await { + Ok(stream) => stream, Err(e) => { - error!("Failed to connect to the server: {}", e); + error!("Failed to open send stream: {}", e); return; } }; + if let Err(e) = send(&mut send_stream, &client_info_data).await { + error!("Failed to send client info: {}", e); + return; + } + + // Wait for server info response + let mut stream = match server_connection.uni_streams.next().await { + Some(Ok(stream)) => stream, + Some(Err(e)) => { + error!("Error receiving server info stream: {}", e); + return; + } + None => { + error!("No server info stream received"); + return; + } + }; + + let mut buf = [0; 4]; + if let Err(e) = stream.read_exact(&mut buf).await { + error!("Failed to read server info length: {}", e); + return; + } + let len = u32::from_le_bytes(buf) as usize; + let mut data = vec![0; len]; + if let Err(e) = stream.read_exact(&mut data).await { + error!("Failed to read server info data: {}", e); + return; + } + + let server_info = match bincode::deserialize::(&data) { + Ok(shared::ServerCommand::ServerInfo(info)) => info, + _ => { + error!("Invalid server info received"); + return; + } + }; + + info!("Connected to server: {}", server_info.name); + + // Send server info to game client + let server_info_bytes = server_command_to_client_bytes( + shared::ServerCommand::ServerInfo(server_info.clone()) + ); + let (client_stream_reader, mut client_stream_writer) = tokio::io::split(client_stream); - let _ = client_stream_writer.write_all(CONNECTED_BYTE).await; + if let Err(e) = client_stream_writer.write_all(CONNECTED_BYTE).await { + error!("Failed to send connection byte to client: {}", e); + return; + } + + if let Err(e) = client_stream_writer.write_all(&server_info_bytes).await { + error!("Failed to send server info to game: {}", e); + return; + } let (client_event_sender, client_event_receiver) = tokio::sync::mpsc::unbounded_channel::<(bool, shared::ClientCommand)>(); @@ -133,10 +224,10 @@ async fn connect_to_server( match voice_chat::try_create_vc_playback_task(vc_playback_receiver) { Ok(handle) => { non_critical_tasks.push(handle); - debug!("Playback OK") + info!("Voice chat playback task created successfully"); } Err(e) => { - error!("Failed to set up voice chat playback: {}", e) + error!("Failed to set up voice chat playback: {}", e); } }; @@ -146,16 +237,16 @@ async fn connect_to_server( ) { Ok(handle) => { non_critical_tasks.push(handle); - debug!("Recording OK") + info!("Voice chat recording task created successfully"); } Err(e) => { - error!("Failed to set up voice chat recording: {}", e) + error!("Failed to set up voice chat recording: {}", e); } }; tokio::spawn(async move { - debug!("Starting tasks"); - match tokio::try_join!( + info!("Starting tasks"); + let result = tokio::try_join!( async { while let Some(result) = non_critical_tasks.next().await { match result { @@ -180,14 +271,25 @@ async fn connect_to_server( vc_playback_sender, server_connection ), - ) { - Ok(_) => debug!("Tasks completed successfully"), - Err(e) => warn!("Tasks ended due to exception: {}", e), + ); + + match result { + Ok(_) => info!("Tasks completed successfully"), + Err(e) => { + error!("Tasks ended due to exception: {}", e); + discord_tx.send(DiscordState { server_name: None }).unwrap(); + } } - discord_tx.send(DiscordState { server_name: None }).unwrap(); }); } +async fn send(stream: &mut quinn::SendStream, message: &[u8]) -> anyhow::Result<()> { + stream.write_all(&(message.len() as u32).to_le_bytes()).await?; + stream.write_all(message).await?; + stream.finish().await?; + Ok(()) +} + fn server_command_to_client_bytes(command: shared::ServerCommand) -> Vec { match command { shared::ServerCommand::FilePart(name, data, chunk_n, file_size, data_left) => { @@ -236,9 +338,13 @@ async fn server_incoming( vc_playback_sender: std::sync::mpsc::Sender, server_connection: quinn::NewConnection, ) -> AHResult { - let mut reliable_commands = server_connection - .uni_streams - .map(|stream| async { Ok::<_, anyhow::Error>(read_pascal_bytes(&mut stream?).await?) }); + let mut reliable_commands = server_connection.uni_streams + .map(|stream| async { + let mut stream = stream?; + read_pascal_bytes(&mut stream).await + }) + .buffered(256) + .fuse(); let mut unreliable_commands = server_connection .datagrams @@ -246,26 +352,48 @@ async fn server_incoming( .buffer_unordered(1024); loop { - let command_bytes = tokio::select! { - Some(reliable_command) = reliable_commands.next() => { - reliable_command.await? + tokio::select! { + command = reliable_commands.next() => match command { + Some(Ok(bytes)) => { + let command = bincode::deserialize::(&bytes)?; + match command { + shared::ServerCommand::VoiceChatPacket(client, pos, data) => { + let _ = vc_playback_sender.send(voice_chat::VoiceChatPlaybackEvent::Packet( + client, pos, data, + )); + } + _ => server_commands_sender.send(command).await?, + } + } + Some(Err(e)) => { + warn!("Error reading reliable command: {}", e); + break; + } + None => break, }, - Some(unreliable_command) = unreliable_commands.next() => { - unreliable_command? + command = unreliable_commands.next() => match command { + Some(Ok(bytes)) => { + if let Ok(command) = bincode::deserialize::(&bytes) { + match command { + shared::ServerCommand::VoiceChatPacket(client, pos, data) => { + let _ = vc_playback_sender.send(voice_chat::VoiceChatPlaybackEvent::Packet( + client, pos, data, + )); + } + _ => server_commands_sender.send(command).await?, + } + } + } + Some(Err(e)) => { + warn!("Error reading unreliable command: {}", e); + break; + } + None => break, }, - else => break - }; - let command = bincode::deserialize::(command_bytes.as_ref())?; - match command { - shared::ServerCommand::VoiceChatPacket(client, pos, data) => { - let _ = vc_playback_sender.send(voice_chat::VoiceChatPlaybackEvent::Packet( - client, pos, data, - )); - } - _ => server_commands_sender.send(command).await?, - }; + else => break, + } } - debug!("Server incoming closed"); + info!("Server incoming closed"); Ok(()) } @@ -342,3 +470,27 @@ impl rustls::client::ServerCertVerifier for AcceptAnyCertificate { Ok(rustls::client::ServerCertVerified::assertion()) } } + +struct ClientCertResolver { + cert: rustls::Certificate, + key: rustls::PrivateKey, +} + +impl rustls::client::ResolvesClientCert for ClientCertResolver { + fn resolve( + &self, + _acceptable_issuers: &[&[u8]], + _sigschemes: &[rustls::SignatureScheme], + ) -> Option> { + let signing_key = rustls::sign::any_supported_type(&self.key) + .expect("Failed to load private key"); + Some(Arc::new(rustls::sign::CertifiedKey::new( + vec![self.cert.clone()], + signing_key, + ))) + } + + fn has_certs(&self) -> bool { + true + } +} \ No newline at end of file diff --git a/kissmp-master/Cargo.toml b/kissmp-master/Cargo.toml index 94d9a77..1d32582 100644 --- a/kissmp-master/Cargo.toml +++ b/kissmp-master/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "kissmp-master" -version = "0.6.0" -authors = ["TheHellBox "] -edition = "2018" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/kissmp-server/Cargo.toml b/kissmp-server/Cargo.toml index c478c55..2d0008b 100644 --- a/kissmp-server/Cargo.toml +++ b/kissmp-server/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "kissmp-server" -version = "0.6.0" -authors = ["hellbox"] -edition = "2018" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/kissmp-server/src/events.rs b/kissmp-server/src/events.rs index ec611d0..e1fb415 100644 --- a/kissmp-server/src/events.rs +++ b/kissmp-server/src/events.rs @@ -282,6 +282,36 @@ impl Server { let _ = client.conn.send_datagram(data.clone().into()); } } + DataChunk { chunk_index, total_chunks, data } => { + // info!("Received chunk {}/{} from client {}", chunk_index + 1, total_chunks, client_id); + let chunks = self.chunk_buffers + .entry(client_id) + .or_insert_with(HashMap::new) + .entry(total_chunks) + .or_insert_with(|| vec![String::new(); total_chunks as usize]); + + chunks[chunk_index as usize] = data; + + if chunks.iter().all(|c| !c.is_empty()) { + // info!("All {} chunks received, reassembling data", total_chunks); + let full_json = chunks.join(""); + // Parse and recursively handle reassembled command + match serde_json::from_str::(&full_json) { + Ok(original_command) => { + // Box to avoid infinite type size + Box::pin(self.on_client_event( + client_id, + IncomingEvent::ClientCommand(original_command) + )).await; + } + Err(e) => { + error!("Failed to parse reassembled JSON: {}", e); + } + } + // Clear chunks for this client + self.chunk_buffers.get_mut(&client_id).unwrap().remove(&total_chunks); + } + } _ => {} } } diff --git a/kissmp-server/src/lib.rs b/kissmp-server/src/lib.rs index 15245ec..ed24dbe 100644 --- a/kissmp-server/src/lib.rs +++ b/kissmp-server/src/lib.rs @@ -19,7 +19,7 @@ use vehicle::*; use anyhow::{Context, Error}; use futures::FutureExt; use futures::{select, StreamExt, TryStreamExt}; -use log::{error, info, warn}; +use log::{error, info, warn, debug}; use std::collections::HashMap; use std::convert::TryFrom; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -68,6 +68,7 @@ pub struct Server { vehicles: HashMap, // Client ID, game_id, server_id vehicle_ids: HashMap>, + chunk_buffers: HashMap>>, // client_id -> (total_chunks -> chunks) reqwest_client: reqwest::Client, name: String, description: String, @@ -100,6 +101,7 @@ impl Server { reqwest_client: reqwest::Client::new(), vehicles: HashMap::with_capacity(64), vehicle_ids: HashMap::with_capacity(64), + chunk_buffers: HashMap::new(), name: config.server_name, description: config.description, map: config.map, @@ -127,6 +129,7 @@ impl Server { setup_result: Option>, ) { let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.port); + info!("Server is starting on {}", addr); if self.upnp_enabled { if let Some(port) = upnp_pf(self.port) { info!("uPnP mapping succeeded. Port: {}", port); @@ -167,6 +170,7 @@ impl Server { .with_no_client_auth() .with_single_cert(vec![cert], key) .unwrap(); + server_crypto.alpn_protocols.push(b"kissmp".to_vec()); let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_crypto)); @@ -174,9 +178,11 @@ impl Server { transport.max_idle_timeout(Some( IdleTimeout::try_from(std::time::Duration::from_secs(60)).unwrap(), )); + transport.keep_alive_interval(Some(std::time::Duration::from_secs(2))); server_config.transport = std::sync::Arc::new(transport); let (_endpoint, incoming) = quinn::Endpoint::server(server_config, addr).unwrap(); + info!("Server is listening on {}", addr); let (client_events_tx, client_events_rx) = mpsc::channel(128); let mut client_events_rx = ReceiverStream::new(client_events_rx).fuse(); @@ -212,9 +218,12 @@ impl Server { } conn = incoming.select_next_some() => { if let Ok(conn) = conn { + info!("New connection attempt from {:?}", conn.connection.remote_address()); if let Err(e) = self.on_connect(conn, client_events_tx.clone()).await { - warn!("Client has failed to connect to the server"); + warn!("Client has failed to connect to the server: {}", e); } + } else { + warn!("Failed to accept incoming connection: {:?}", conn); } }, stdin_input = reader.next() => { @@ -247,7 +256,7 @@ impl Server { } } async fn send_server_info(&self) -> anyhow::Result<()> { - if !self.show_in_list { + if (!self.show_in_list) { return Ok(()); } let server_info = serde_json::json!({ @@ -278,6 +287,31 @@ impl Server { mut new_connection: quinn::NewConnection, client_events_tx: mpsc::Sender<(u32, IncomingEvent)>, ) -> anyhow::Result<()> { + info!("Connection handshake starting..."); + + info!("Connection stats: {:?}", new_connection.connection.stats()); + + // timeout for receiving client info + let client_info = tokio::time::timeout( + std::time::Duration::from_secs(5), + async { + let mut stream = new_connection.uni_streams.try_next().await?; + if let Some(stream) = &mut stream { + info!("Receiving client info stream..."); + let mut buf = [0; 4]; + stream.read_exact(&mut buf[0..4]).await?; + let len = u32::from_le_bytes(buf).min(16384) as usize; + info!("Expected client info length: {}", len); + let mut buf: Vec = vec![0; len]; + stream.read_exact(&mut buf).await?; + info!("Received client info bytes: {} bytes", buf.len()); + Ok(buf) + } else { + Err(anyhow::Error::msg("No client info stream received")) + } + } + ).await; + let connection = new_connection.connection.clone(); if self.connections.len() >= self.max_players.into() { connection.close(0u32.into(), b"Server is full"); @@ -285,6 +319,9 @@ impl Server { } // Should be strong enough for our targets. TODO: Check for collisions anyway let id = rand::random::(); + + info!("Client connected with ID: {}", id); + let (ordered_tx, ordered_rx) = mpsc::channel(128); let (unreliable_tx, unreliable_rx) = mpsc::channel(128); async fn receive_client_data( @@ -296,23 +333,29 @@ impl Server { let mut buf = [0; 4]; stream.read_exact(&mut buf[0..4]).await?; let len = u32::from_le_bytes(buf).min(16384) as usize; + info!("Client info length: {}", len); let mut buf: Vec = vec![0; len]; stream.read_exact(&mut buf).await?; + info!("Received raw client info data"); let info: shared::ClientCommand = bincode::deserialize::(&buf)?; + info!("Deserialized client info"); + info!("[DEBUG] Received client command: {:?}", info); if let shared::ClientCommand::ClientInfo(info) = info { + info!("Got client info: {:?}", info); Ok(info) } else { - Err(anyhow::Error::msg("Failed to fetch client info")) + Err(anyhow::Error::msg("Failed to fetch client info - wrong command type")) } } else { - Err(anyhow::Error::msg("Failed to fetch client info")) + Err(anyhow::Error::msg("Failed to fetch client info - no stream")) } } let connection_clone = connection.clone(); // Receiver tokio::spawn(async move { + info!("[CONNECT_TASK] Starting connection task for {}", id); let client_info = { if let Ok(client_data) = receive_client_data(&mut new_connection).await { client_data @@ -354,6 +397,7 @@ impl Server { .send((id, IncomingEvent::ClientConnected(client_connection))) .await .unwrap(); + info!("[CONNECT_TASK] Starting drive_receive for {}", id); if let Err(_e) = Self::drive_receive( id, new_connection.uni_streams, @@ -382,10 +426,24 @@ impl Server { .unwrap(); // Sender tokio::spawn(async move { - let mut stream = connection.open_uni().await; - if let Ok(stream) = &mut stream { - let _ = send(stream, &server_info).await; - let _ = Self::drive_send(connection, ordered_rx, unreliable_rx).await; + let mut stream = match connection.open_uni().await { + Ok(stream) => stream, + Err(e) => { + error!("Failed to open server info stream: {}", e); + return; + } + }; + info!("[DEBUG] Sender task started for {}", id); + if let Err(e) = send(&mut stream, &server_info).await { + error!("Failed to send server info: {}", e); + return; + } + info!("[DEBUG] Server info sent to {}", id); + // debug!("Sent server info to client"); + + // Start driving connection + if let Err(e) = Self::drive_send(connection, ordered_rx, unreliable_rx).await { + error!("Connection drive_send error: {}", e); } }); Ok(()) @@ -436,6 +494,7 @@ impl Server { datagrams: quinn::Datagrams, mut client_events_tx: mpsc::Sender<(u32, IncomingEvent)>, ) -> anyhow::Result<()> { + info!("[DEBUG] Starting drive_receive for {}", id); let mut cmds = streams .map(|stream| async { let mut stream = stream?; @@ -538,10 +597,9 @@ fn generate_certificate() -> (rustls::Certificate, rustls::PrivateKey) { } async fn send(stream: &mut quinn::SendStream, message: &[u8]) -> anyhow::Result<()> { - stream - .write_all(&(message.len() as u32).to_le_bytes()) - .await?; - stream.write_all(&message).await?; + stream.write_all(&(message.len() as u32).to_le_bytes()).await?; + stream.write_all(message).await?; + stream.finish().await?; Ok(()) } @@ -610,7 +668,7 @@ pub fn list_mods( }; match r { Ok(p) => { - if !p.exists() { + if (!p.exists()) { error!("Mod file {:?} not found", p); continue; } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index e170329..ba4e068 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "shared" -version = "0.6.0" -authors = ["TheHellBox "] -edition = "2018" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 3da3b4b..c1a3e95 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -7,8 +7,8 @@ use std::io::Write; use chrono::Local; pub use log::{info, warn, error}; -pub const VERSION: (u32, u32) = (0, 6); -pub const VERSION_STR: &str = "0.6.0"; +pub const VERSION: (u32, u32) = (0, 7); +pub const VERSION_STR: &str = "0.7.0"; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ClientInfoPrivate { @@ -82,6 +82,11 @@ pub enum ClientCommand { StartTalking, // Only used by bridge EndTalking, + DataChunk { + chunk_index: u32, + total_chunks: u32, + data: String, + }, Ping(u16), }