From 7a00c8ef0bd1ed66bebc1f1eec912fa204384696 Mon Sep 17 00:00:00 2001 From: Carlen White Date: Fri, 24 Dec 2021 07:33:42 -0500 Subject: [PATCH 1/5] Downloads handled by bridge instead of mod The bridge will be what handles downloading the mods instead of relying on the mod-side to write the files itself. The bridge communicates the progress of the file, however. Also introduces a basic state sharing framework but at the moment is used to share where mods should be downloaded to and a to-be-implemented switch to enable unix compatibility should the bridge not need to fix the given path download path. State is initially given by the mod on connect. Additionally a couple of miscellaneous things were changed that didn't age too well or were changed to be reused. --- .../lua/ge/extensions/kissmods.lua | 10 +- KISSMultiplayer/lua/ge/extensions/network.lua | 145 ++++---- kissmp-bridge/Cargo.toml | 3 +- kissmp-bridge/src/main.rs | 323 ++++++++++++++---- shared/src/lib.rs | 16 +- shared/src/state.rs | 8 + 6 files changed, 355 insertions(+), 150 deletions(-) create mode 100644 shared/src/state.rs diff --git a/KISSMultiplayer/lua/ge/extensions/kissmods.lua b/KISSMultiplayer/lua/ge/extensions/kissmods.lua index da4635a..b640b7a 100644 --- a/KISSMultiplayer/lua/ge/extensions/kissmods.lua +++ b/KISSMultiplayer/lua/ge/extensions/kissmods.lua @@ -123,18 +123,13 @@ local function set_mods_list(mod_list) end end -local function open_file(name) - if not string.endswith(name, ".zip") then return end +local function get_mod_directory() if not FS:directoryExists("/kissmp_mods/") then FS:directoryCreate("/kissmp_mods/") end - local path = "/kissmp_mods/"..name - print(path) - local file = io.open(path, "wb") - return file + return FS:getFileRealPath("/kissmp_mods/") end -M.open_file = open_file M.check_mods = check_mods M.is_special_mod = is_special_mod M.mount_mod = mount_mod @@ -143,5 +138,6 @@ M.deactivate_all_mods = deactivate_all_mods M.set_mods_list = set_mods_list M.update_status_all = update_status_all M.update_status = update_status +M.get_mod_directory = get_mod_directory return M diff --git a/KISSMultiplayer/lua/ge/extensions/network.lua b/KISSMultiplayer/lua/ge/extensions/network.lua index ec9028f..e631fcc 100644 --- a/KISSMultiplayer/lua/ge/extensions/network.lua +++ b/KISSMultiplayer/lua/ge/extensions/network.lua @@ -2,8 +2,6 @@ local M = {} M.VERSION_STR = "0.5.0" -M.downloads = {} -M.downloading = false M.downloads_status = {} local current_download = nil @@ -12,6 +10,13 @@ local socket = require("socket") local messagepack = require("lua/common/libs/Lua-MessagePack/MessagePack") local ping_send_time = 0 +-- shared::State +-- Should be in it's own module, but for now this is fine. +local state = { + download_directory = kissmods.get_mod_directory(), + disregard_unix_path_correction = false +} + M.players = {} M.socket = socket M.base_secret = "None" @@ -75,27 +80,6 @@ local function disconnect(data) returnToMainMenu() end -local function handle_disconnected(data) - disconnect(data) -end - -local function handle_file_transfer(data) - kissui.show_download = true - local file_len = ffi.cast("uint32_t*", ffi.new("char[?]", 5, data:sub(1, 4)))[0] - local file_name = data:sub(5, #data) - local chunks = math.floor(file_len / FILE_TRANSFER_CHUNK_SIZE) - - current_download = { - file_len = file_len, - file_name = file_name, - chunks = chunks, - last_chunk = file_len - chunks * FILE_TRANSFER_CHUNK_SIZE, - current_chunk = 0, - file = kissmods.open_file(file_name) - } - M.downloading = true -end - local function handle_player_info(player_info) M.players[player_info.id] = player_info end @@ -146,6 +130,45 @@ local function handle_chat(data) kissui.chat.add_message(data[1], nil, data[2]) end +local function change_map(map) + if FS:fileExists(map) or FS:directoryExists(map) then + vehiclemanager.loading_map = true + freeroam_freeroam.startFreeroam(map) + else + kissui.chat.add_message("Map file doesn't exist. Check if mod containing map is enabled", kissui.COLOR_RED) + disconnect() + end +end + +local function on_finished_download() + vehiclemanager.loading_map = true + M.downloads_status = {} + change_map(M.connection.server_info.map) +end + +local function on_download_progress(data) + local name, recieved, total = data[1], data[2], data[3] + kissui.show_download = true + + M.downloads_status[name] = { + name = name, + progress = recieved/total + } + if recieved >= total then + M.downloads_status[name] = nil + M.connection.mods_left = M.connection.mods_left - 1 + kissmods.mount_mod(name) + end + if M.connection.mods_left <= 0 then + kissui.show_download = false + on_finished_download() + end +end + +local function on_state_update(data) + print("Updating the mod's state is not implemented yet!") +end + local function onExtensionLoaded() message_handlers.VehicleUpdate = vehiclemanager.update_vehicle message_handlers.VehicleSpawn = vehiclemanager.spawn_vehicle @@ -161,9 +184,11 @@ local function onExtensionLoaded() message_handlers.CouplerAttached = vehiclemanager.attach_coupler message_handlers.CouplerDetached = vehiclemanager.detach_coupler message_handlers.ElectricsUndefinedUpdate = vehiclemanager.electrics_diff_update + message_handlers.DownloadProgress = on_download_progress + message_handlers.StateUpdate = on_state_update end -local function send_data(raw_data, reliable) +local function send_data_no_connection_check(raw_data, reliable) if type(raw_data) == "number" then print("NOT IMPLEMENTED. PLEASE REPORT TO KISSMP DEVELOPERS. CODE: "..raw_data) return @@ -175,7 +200,6 @@ local function send_data(raw_data, reliable) else data = jsonEncode(raw_data) end - if not M.connection.connected then return -1 end local len = #data local len = ffi.string(ffi.new("uint32_t[?]", 1, {len}), 4) if reliable then @@ -187,6 +211,11 @@ local function send_data(raw_data, reliable) M.connection.tcp:send(data) end +local function send_data(raw_data, reliable) + if not M.connection.connected then return -1 end + send_data_no_connection_check(raw_data, reliable) +end + local function sanitize_addr(addr) -- Trim leading and trailing spaces that might occur during a copy/paste local sanitized = addr:gsub("^%s*(.-)%s*$", "%1") @@ -203,20 +232,11 @@ local function generate_secret(server_identifier) return hashStringSHA1(secret) end -local function change_map(map) - if FS:fileExists(map) or FS:directoryExists(map) then - vehiclemanager.loading_map = true - freeroam_freeroam.startFreeroam(map) - else - kissui.chat.add_message("Map file doesn't exist. Check if mod containing map is enabled", kissui.COLOR_RED) - disconnect() - end -end - local function connect(addr, player_name) if M.connection.connected then disconnect() end + M.downloads_status = {} M.players = {} print("Connecting...") @@ -227,10 +247,16 @@ local function connect(addr, player_name) local connected, err = M.connection.tcp:connect("127.0.0.1", "7894") -- Send server address to the bridge - local addr_lenght = ffi.string(ffi.new("uint32_t[?]", 1, {#addr}), 4) - M.connection.tcp:send(addr_lenght) + local addr_length = ffi.string(ffi.new("uint32_t[?]", 1, {#addr}), 4) + M.connection.tcp:send(addr_length) M.connection.tcp:send(addr) + -- Send the initial state + kissui.chat.add_message("Sending state...") + send_data_no_connection_check({ + StateUpdate = state + }, true) + local connection_confirmed = M.connection.tcp:receive(1) if connection_confirmed then if connection_confirmed ~= string.char(1) then @@ -319,11 +345,6 @@ local function send_messagepack(data_type, reliable, data) send_data(data_type, reliable, data) end -local function on_finished_download() - vehiclemanager.loading_map = true - change_map(M.connection.server_info.map) -end - local function send_ping() ping_send_time = socket.gettime() send_data( @@ -339,9 +360,7 @@ local function cancel_download() io.close(current_download.file) current_download = nil M.downloading = false]]-- - for k, v in pairs(M.downloads) do - M.downloads[k]:close() - end + M.downloads_status = {} end local function onUpdate(dt) @@ -371,41 +390,7 @@ local function onUpdate(dt) end end elseif string.byte(msg_type) == 0 then -- Binary data - M.downloading = true - kissui.show_download = true - local name_b = M.connection.tcp:receive(4) - local len_n = ffi.cast("uint32_t*", ffi.new("char[?]", 5, name_b)) - local name, _, _ = M.connection.tcp:receive(len_n[0]) - local chunk_n_b = M.connection.tcp:receive(4) - local chunk_a_b = M.connection.tcp:receive(4) - local read_size_b = M.connection.tcp:receive(4) - local chunk_n = ffi.cast("uint32_t*", ffi.new("char[?]", 5, chunk_n_b))[0] - local file_length = ffi.cast("uint32_t*", ffi.new("char[?]", 5, chunk_a_b))[0] - local read_size = ffi.cast("uint32_t*", ffi.new("char[?]", 5, read_size_b))[0] - local file_data, _, _ = M.connection.tcp:receive(read_size) - M.downloads_status[name] = { - name = name, - progress = 0 - } - M.downloads_status[name].progress = chunk_n * FILE_TRANSFER_CHUNK_SIZE / file_length - local file = M.downloads[name] - if not file then - M.downloads[name] = kissmods.open_file(name) - end - M.downloads[name]:write(file_data) - if read_size < FILE_TRANSFER_CHUNK_SIZE then - M.downloading = false - kissui.show_download = false - kissmods.mount_mod(name) - M.downloads[name]:close() - M.downloads[name] = nil - M.downloads_status = {} - M.connection.mods_left = M.connection.mods_left - 1 - end - if M.connection.mods_left <= 0 then - on_finished_download() - end - M.connection.tcp:settimeout(0.0) + -- BINARY DOWNLOADS DEPRECATED break elseif string.byte(msg_type) == 2 then local len_b = M.connection.tcp:receive(4) diff --git a/kissmp-bridge/Cargo.toml b/kissmp-bridge/Cargo.toml index fe9f971..97a7909 100644 --- a/kissmp-bridge/Cargo.toml +++ b/kissmp-bridge/Cargo.toml @@ -31,4 +31,5 @@ rodio = "0.14" cpal = "0.13" fon = "0.5.0" log = "0.4" -indoc = "1.0" \ No newline at end of file +indoc = "1.0" +dirs = "4.0.0" \ No newline at end of file diff --git a/kissmp-bridge/src/main.rs b/kissmp-bridge/src/main.rs index 880608d..b63c621 100644 --- a/kissmp-bridge/src/main.rs +++ b/kissmp-bridge/src/main.rs @@ -2,17 +2,62 @@ pub mod discord; pub mod http_proxy; pub mod voice_chat; + +use anyhow::Context; use futures::stream::FuturesUnordered; -use futures::{StreamExt}; +use futures::StreamExt; +use tokio::fs::{File, self, OpenOptions}; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::error::Error; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, WriteHalf}; +use std::path::{Path, PathBuf}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, WriteHalf, BufWriter}; use tokio::net::{TcpListener, TcpStream}; +use std::sync::Arc; +use tokio::sync::Mutex; #[macro_use] extern crate log; const SERVER_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); const CONNECTED_BYTE: &[u8] = &[1]; +type ArcMutex = Arc>; + +type ActiveDownloadsHashMap = HashMap; +struct ActiveDownload { + path: PathBuf, + writer: BufWriter, + recieved: u32, + file_size: u32 +} + +impl ActiveDownload { + fn new(path: PathBuf, writer: BufWriter, file_size: u32) -> Self { Self { path, writer, recieved: 0, file_size } } +} + +#[derive(Debug)] +struct JsonParseError { + data: Vec, + source: serde_json::Error +} + +impl JsonParseError { + fn new(data: Vec, source: serde_json::Error) -> Self { Self { data, source } } +} + +impl std::fmt::Display for JsonParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Could not parse JSON") + } +} + +impl Error for JsonParseError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.source) + } +} + #[derive(Debug, Clone)] pub struct DiscordState { pub server_name: Option, @@ -20,7 +65,7 @@ pub struct DiscordState { async fn read_pascal_bytes( stream: &mut R -) -> Result, anyhow::Error> { +) -> anyhow::Result> { let mut buffer = [0; 4]; stream.read_exact(&mut buffer).await?; let len = u32::from_le_bytes(buffer) as usize; @@ -32,7 +77,7 @@ async fn read_pascal_bytes( async fn write_pascal_bytes( stream: &mut W, bytes: &mut Vec -) -> Result<(), anyhow::Error>{ +) -> anyhow::Result<()>{ let len = bytes.len() as u32; let mut data = Vec::with_capacity(len as usize + 4); data.append(&mut len.to_le_bytes().to_vec()); @@ -40,6 +85,20 @@ async fn write_pascal_bytes( Ok(stream.write_all(&data).await?) } + +fn correct_path_for_unix(path: &Path) -> PathBuf { + dirs::home_dir() + .unwrap() + .join(".steam/steam/steamapps/compatdata/284160/pfx/drive_c/") + .join( + path.to_str() + .unwrap() + .to_string() + .replace(r#"C:\"#, "") + .replace(r#"\"#, "/") + ) +} + #[tokio::main] async fn main() { shared::init_logging(); @@ -81,12 +140,38 @@ async fn main() { } }; + info!("Getting initial state..."); + + let initial_state = match read_client_command(&mut client_stream).await { + Ok((_, c)) => { + if let shared::ClientCommand::StateUpdate(initial_state) = c { + initial_state + } else { + error!("Client replied back with a different command instead of providing the state."); + error!("{:?}", c); + continue; + } + }, + Err(e) => { + error!("Could not get a response back from the client: {:?}", e); + continue; + }, + }; + + if cfg!(unix) && !initial_state.disregard_unix_path_correction { + warn!( + "We are going to change the given path from the mod to a compatiable path of the most common Steam Proton prefix. This case this path will be:\n{}", + correct_path_for_unix(&Path::new(&initial_state.download_directory).to_path_buf()).to_string_lossy() + ); + } + info!("Connecting to {}...", addr); - connect_to_server(addr, client_stream, discord_tx.clone()).await; + connect_to_server(initial_state, addr, client_stream, discord_tx.clone()).await; } } async fn connect_to_server( + initial_state: shared::State, addr: SocketAddr, client_stream: TcpStream, discord_tx: std::sync::mpsc::Sender @@ -161,6 +246,8 @@ async fn connect_to_server( }; tokio::spawn(async move { + let state: ArcMutex = Arc::new(Mutex::new(initial_state)); + let mut active_downloads: ActiveDownloadsHashMap = HashMap::new(); debug!("Starting tasks"); match tokio::try_join!( async { @@ -174,44 +261,49 @@ async fn connect_to_server( Ok(()) }, client_outgoing( + &state, server_commands_receiver, client_stream_writer), client_incoming( + &state, server_connection.connection.clone(), vc_playback_sender.clone(), client_stream_reader, vc_recording_sender, client_event_sender), server_outgoing( + &state, server_connection.connection.clone(), client_event_receiver), server_incoming( + &state, server_commands_sender, vc_playback_sender, - server_connection), + server_connection, + &mut active_downloads + ), ) { Ok(_) => debug!("Tasks completed successfully"), Err(e) => warn!("Tasks ended due to exception: {}", e), } + if active_downloads.len() > 0 { + warn!("Unfinished downloads. Deleting..."); + for (name, ActiveDownload { path, ..}) in active_downloads { + warn!("Deleting {}", name); + if let Err(e) = fs::remove_file(path).await { + error!("Failed to delete unfinished download {}: {}", name, e); + } + } + }; discord_tx.send(DiscordState { server_name: None }).unwrap(); }); } fn server_command_to_client_bytes(command: shared::ServerCommand) -> Vec { match command { - shared::ServerCommand::FilePart(name, data, chunk_n, file_size, data_left) => { - let name_b = name.as_bytes(); - let mut result = vec![0]; - result.append(&mut (name_b.len() as u32).to_le_bytes().to_vec()); - result.append(&mut name_b.to_vec()); - result.append(&mut chunk_n.to_le_bytes().to_vec()); - result.append(&mut file_size.to_le_bytes().to_vec()); - result.append(&mut data_left.to_le_bytes().to_vec()); - result.append(&mut data.clone()); - result - } - shared::ServerCommand::VoiceChatPacket(_, _, _) => panic!("Voice packets have to handled by the bridge itself."), + shared::ServerCommand::FilePart(..) => panic!("The client no longer handles file downloads directly."), + shared::ServerCommand::VoiceChatPacket(..) => panic!("Voice packets have to handled by the bridge itself."), _ => { let json = serde_json::to_string(&command).unwrap(); //println!("{:?}", json); @@ -224,98 +316,210 @@ fn server_command_to_client_bytes(command: shared::ServerCommand) -> Vec { } } -type AHResult = Result<(), anyhow::Error>; - async fn client_outgoing( + _state: &ArcMutex, mut server_commands_receiver: tokio::sync::mpsc::Receiver, mut client_stream_writer: WriteHalf -) -> AHResult { +) -> anyhow::Result<()> { while let Some(server_command) = server_commands_receiver.recv().await { - client_stream_writer.write_all(server_command_to_client_bytes(server_command).as_ref()).await?; + client_stream_writer.write_all(&server_command_to_client_bytes(server_command)).await?; } debug!("Server outgoing closed"); Ok(()) } async fn server_incoming( + state: &ArcMutex, server_commands_sender: tokio::sync::mpsc::Sender, vc_playback_sender: std::sync::mpsc::Sender, - server_connection: quinn::generic::NewConnection -) -> AHResult { + server_connection: quinn::generic::NewConnection, + active_downloads: &mut ActiveDownloadsHashMap +) -> anyhow::Result<()> { + // Shorthand + let command_from_bytes = |b: &[u8]| -> anyhow::Result { + Ok(bincode::deserialize::(b)?) + }; + let mut reliable_commands = server_connection .uni_streams .map(|stream| async { - Ok::<_, anyhow::Error>(read_pascal_bytes(&mut stream?).await?) + anyhow::Ok( + command_from_bytes(read_pascal_bytes(&mut stream?).await?.as_slice()) + ) }); let mut unreliable_commands = server_connection .datagrams .map(|data| async { - Ok::<_, anyhow::Error>(data?.to_vec()) + anyhow::Ok( + command_from_bytes(data?.as_ref())? + ) }) .buffer_unordered(1024); loop { - let command_bytes = + let command: shared::ServerCommand = tokio::select! { Some(reliable_command) = reliable_commands.next() => { - reliable_command.await? + reliable_command.await?? }, Some(unreliable_command) = unreliable_commands.next() => { unreliable_command? }, else => break }; - let command = bincode::deserialize::(command_bytes.as_ref())?; + match command { shared::ServerCommand::VoiceChatPacket(client, pos, data) =>{ let _ = vc_playback_sender.send(voice_chat::VoiceChatPlaybackEvent::Packet( client, pos, data, )); }, + shared::ServerCommand::FilePart(name, data, chunk_n, file_size, sent) => { + let mut entry = active_downloads.entry(name.clone()); + let ActiveDownload { writer, recieved, file_size, .. } = match entry { + Entry::Occupied(ref mut occupied) => { + occupied.get_mut() + }, + Entry::Vacant(vacant) => { + let name_ref = vacant.key(); + if !name_ref.ends_with(".zip") { + return Err(anyhow::Error::msg(format!("The server tried to send something other than a zip: {}", name_ref))) + } + + if chunk_n != 0 { + return Err(anyhow::Error::msg("A download was started mid transfer.")) + } + + let s = &state.lock().await; + let download_directory: PathBuf = { + if cfg!(unix) && !s.disregard_unix_path_correction { + // For the forseeable future, BeamNG is going to be a Windows binary inside Proton so this will always be applied when on Linux. + correct_path_for_unix(&Path::new(&s.download_directory).to_path_buf()) + } else { + Path::new(&s.download_directory).to_path_buf() + } + }; + + let path = download_directory.join(name_ref); + + let f = OpenOptions::new() + .write(true) + .create_new(true) + .open(&path).await + .with_context(|| format!("Could not create file for download: {}", name_ref))?; + + // Allocate space to catch low storage early + f.set_len(file_size as u64).await?; + + vacant.insert(ActiveDownload::new( + path, + BufWriter::new(f), + file_size + )) + }, + }; + writer.write(&data).await.context("Failed to write bytes to file that is being downloaded.")?; + *recieved += sent; + let r = *recieved; + let s = *file_size; + if r >= s { + writer.flush().await + .with_context(|| format!("Failed to flush bytes to file that finished downloading: {}", name))?; + active_downloads.remove(&name); + } + server_commands_sender.send(shared::ServerCommand::DownloadProgress(name, r, s)).await?; + } + shared::ServerCommand::StateUpdate(..) => { + error!("Server tried to update the state of the client! Are we connected to a evil server?"); + } _ => server_commands_sender.send(command).await? }; }; + debug!("Server incoming closed"); Ok(()) } +/** + Reads bytes from the client and returns the command the client is trying to send and if it should be sent reliably to the server (if applicable.) +*/ +async fn read_client_command( + client_stream_reader: &mut (impl AsyncRead + std::marker::Unpin) +) -> anyhow::Result<(bool, shared::ClientCommand)> { + let reliable = { + let mut buffer = [0; 1]; + client_stream_reader.read_exact(&mut buffer).await?; + buffer[0] == 1 + }; + let data_size = { + let mut buffer = [0; 4]; + client_stream_reader.read_exact(&mut buffer).await?; + i32::from_le_bytes(buffer) as usize + }; + let mut data_buf = vec![0; data_size]; + client_stream_reader.read_exact(&mut data_buf).await?; + Ok(( + reliable, + serde_json::from_slice::(&data_buf) + .map_err(|e| JsonParseError::new(data_buf, e))? + )) +} + async fn client_incoming( + state: &ArcMutex, server_stream: quinn::generic::Connection, vc_playback_sender: std::sync::mpsc::Sender, mut client_stream_reader: tokio::io::ReadHalf, vc_recording_sender: std::sync::mpsc::Sender, client_event_sender: tokio::sync::mpsc::UnboundedSender<(bool, shared::ClientCommand)> -) -> AHResult { - let mut buffer = [0; 1]; - while let Ok(_) = client_stream_reader.read_exact(&mut buffer).await { - let reliable = buffer[0] == 1; - let mut len_buf = [0; 4]; - let _ = client_stream_reader.read_exact(&mut len_buf).await; - let len = i32::from_le_bytes(len_buf) as usize; - let mut data = vec![0; len]; - let _ = client_stream_reader.read_exact(&mut data).await; - let decoded = serde_json::from_slice::(&data); - if let Ok(decoded) = decoded { - match decoded { - shared::ClientCommand::SpatialUpdate(left_ear, right_ear) => { - let _ = vc_playback_sender.send( - voice_chat::VoiceChatPlaybackEvent::PositionUpdate( - left_ear, right_ear, - ), - ); - } - shared::ClientCommand::StartTalking => { - let _ = vc_recording_sender.send(voice_chat::VoiceChatRecordingEvent::Start); - } - shared::ClientCommand::EndTalking => { - let _ = vc_recording_sender.send(voice_chat::VoiceChatRecordingEvent::End); +) -> anyhow::Result<()> { + loop { + let (reliable, client_command) = match read_client_command(&mut client_stream_reader).await { + Ok(c) => c, + Err(e) => { + if let Some(e_json) = e.downcast_ref::() { + let source = e_json.source().unwrap(); + error!("Error decoding json: {:?}", source); + error!("Data recieved:\n{:?}", String::from_utf8(e_json.data.clone())); + continue; + } else if let Some(e_io) = e.downcast_ref::() { + match e_io.kind() { + // Stream must be closing. + std::io::ErrorKind::UnexpectedEof => { + break; + } + _ => { + error!("IO error: {:?}", e); + break; + } + } + } else { + error!("Other error: {:?}", e); + break; } - _ => client_event_sender.send((reliable, decoded)).unwrap(), - }; - } else { - error!("error decoding json {:?}", decoded); - error!("{:?}", String::from_utf8(data)); + }, + }; + + match client_command { + shared::ClientCommand::SpatialUpdate(left_ear, right_ear) => { + let _ = vc_playback_sender.send( + voice_chat::VoiceChatPlaybackEvent::PositionUpdate( + left_ear, right_ear, + ), + ); + } + shared::ClientCommand::StartTalking => { + let _ = vc_recording_sender.send(voice_chat::VoiceChatRecordingEvent::Start); + } + shared::ClientCommand::EndTalking => { + let _ = vc_recording_sender.send(voice_chat::VoiceChatRecordingEvent::End); + } + shared::ClientCommand::StateUpdate(new_state) => { + let mut s = state.lock().await; + *s = new_state; + } + _ => client_event_sender.send((reliable, client_command)).unwrap(), } } info!("Connection with game is closed"); @@ -325,9 +529,10 @@ async fn client_incoming( } async fn server_outgoing( + _state: &ArcMutex, server_stream: quinn::generic::Connection, mut client_event_receiver: tokio::sync::mpsc::UnboundedReceiver<(bool, shared::ClientCommand)> -) -> AHResult { +) -> anyhow::Result<()> { while let Some((reliable, client_command)) = client_event_receiver.recv().await { let mut data = bincode::serialize::(&client_command)?; if !reliable { diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 2b42509..88442e1 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,6 +1,9 @@ extern crate pretty_env_logger; pub mod vehicle; +pub mod state; +pub use state::State; + use serde::{Deserialize, Serialize}; use vehicle::*; use std::io::Write; @@ -75,12 +78,14 @@ pub enum ClientCommand { CouplerDetached(CouplerDetached), ElectricsUndefinedUpdate(u32, ElectricsUndefined), VoiceChatPacket(Vec), - // Only used by bridge + /// Only used by bridge. SpatialUpdate([f32; 3], [f32; 3]), - // Only used by bridge + /// Only used by bridge. StartTalking, - // Only used by bridge + /// Only used by bridge. EndTalking, + /// Only used by bridge. Update the state to share to the bridge. + StateUpdate(State), Ping(u16), } @@ -101,8 +106,13 @@ pub enum ServerCommand { CouplerDetached(CouplerDetached), ElectricsUndefinedUpdate(u32, ElectricsUndefined), ServerInfo(ServerInfo), + /// Name, bytes, chunk number, total size, size of chunk sent FilePart(String, Vec, u32, u32, u32), VoiceChatPacket(u32, [f32; 3], Vec), + /// Only used by bridge. New state modifed by the bridge. + StateUpdate(State), + /// Only used by bridge. Download progress for a download. Name, bytes saved, total. + DownloadProgress(String, u32, u32), Pong(f64), } diff --git a/shared/src/state.rs b/shared/src/state.rs new file mode 100644 index 0000000..75cd822 --- /dev/null +++ b/shared/src/state.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct State { + pub download_directory: String, + /// Future use + pub disregard_unix_path_correction: bool +} \ No newline at end of file From 2b3cabfc71a001ad6eb7df9ac0d8a12ef904c0b6 Mon Sep 17 00:00:00 2001 From: Carlen White Date: Tue, 28 Dec 2021 11:05:34 -0500 Subject: [PATCH 2/5] Clarify state is being sent to bridge --- KISSMultiplayer/lua/ge/extensions/network.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/KISSMultiplayer/lua/ge/extensions/network.lua b/KISSMultiplayer/lua/ge/extensions/network.lua index e631fcc..b73a96a 100644 --- a/KISSMultiplayer/lua/ge/extensions/network.lua +++ b/KISSMultiplayer/lua/ge/extensions/network.lua @@ -252,7 +252,7 @@ local function connect(addr, player_name) M.connection.tcp:send(addr) -- Send the initial state - kissui.chat.add_message("Sending state...") + kissui.chat.add_message("Sending state to bridge...") send_data_no_connection_check({ StateUpdate = state }, true) From b2375432809c86005679f23dcc47e5a3f6b07707 Mon Sep 17 00:00:00 2001 From: Carlen White Date: Sat, 1 Jan 2022 19:19:18 -0500 Subject: [PATCH 3/5] Slightly better errors --- kissmp-bridge/src/main.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kissmp-bridge/src/main.rs b/kissmp-bridge/src/main.rs index b63c621..43f6978 100644 --- a/kissmp-bridge/src/main.rs +++ b/kissmp-bridge/src/main.rs @@ -285,7 +285,13 @@ async fn connect_to_server( ) { Ok(_) => debug!("Tasks completed successfully"), - Err(e) => warn!("Tasks ended due to exception: {}", e), + Err(e) => { + if let Some(source) = e.source() { + error!("Tasks ended due to exception: {}\nSource: {}", e, source) + } else { + error!("Tasks ended due to exception: {}", e) + } + }, } if active_downloads.len() > 0 { warn!("Unfinished downloads. Deleting..."); From 9ec982c45fc9aa7e1232f552e5ba183c642f62b1 Mon Sep 17 00:00:00 2001 From: Carlen White Date: Sat, 1 Jan 2022 19:25:06 -0500 Subject: [PATCH 4/5] Rename existing files Felt like we should not delete files until we decide how much the bridge should do to manage mods and how it should be done. --- kissmp-bridge/src/main.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/kissmp-bridge/src/main.rs b/kissmp-bridge/src/main.rs index 43f6978..4f3404c 100644 --- a/kissmp-bridge/src/main.rs +++ b/kissmp-bridge/src/main.rs @@ -409,6 +409,34 @@ async fn server_incoming( let path = download_directory.join(name_ref); + if path.exists() { + // Rename to some_name.0.zip + let rename_path = { + let mut new_path = path.clone(); + let mut num: u8 = 0; + // Roundabout way of a do-while by beating it into shape. + // It's funny, horrifying, and clever. + while { + let mut new_name = path.file_name().unwrap().to_owned(); + new_name.push("."); + new_name.push(num.to_string()); + new_path.set_file_name(new_name); + new_path.exists() + } { + num = num.checked_add(1) + .ok_or_else(|| + anyhow::Error::msg(format!("There are 255 versions of `{}`. Clean up your mods.", path.to_string_lossy())) + )?; + }; + new_path + }; + let path_string = path.to_string_lossy(); + let rename_path_string = rename_path.to_string_lossy(); + warn!("`{}` already exists, renaming to `{}`", path_string, rename_path_string); + fs::rename(&path, &rename_path).await + .with_context(|| format!("Could not rename `{}` to `{}`", path_string, rename_path_string))?; + } + let f = OpenOptions::new() .write(true) .create_new(true) From a4b4c38cc34f2848fd9dbb591a0264ea69a7fd51 Mon Sep 17 00:00:00 2001 From: Carlen White Date: Sat, 1 Jan 2022 19:35:17 -0500 Subject: [PATCH 5/5] Slightly more proformant way of finding mods --- KISSMultiplayer/lua/ge/extensions/kissmods.lua | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/KISSMultiplayer/lua/ge/extensions/kissmods.lua b/KISSMultiplayer/lua/ge/extensions/kissmods.lua index b640b7a..546c072 100644 --- a/KISSMultiplayer/lua/ge/extensions/kissmods.lua +++ b/KISSMultiplayer/lua/ge/extensions/kissmods.lua @@ -83,17 +83,13 @@ local function mount_mods(list) end local function update_status(mod) - local search_results = FS:findFiles("/kissmp_mods/", mod.name, 1) - local search_results2 = FS:findFiles("/mods/", mod.name, 99) - - for _, v in pairs(search_results2) do - table.insert(search_results, v) - end + local file_path = FS:findFiles("/kissmp_mods/", mod.name, 1)[1] or + FS:findFiles("/mods/", mod.name, 99)[1] - if not search_results[1] then + if not file_path then mod.status = "missing" else - local file = io.open(search_results[1]) + local file = io.open(file_path) local len = file:seek("end") if len ~= mod.size then mod.status = "different"