diff --git a/Cargo.toml b/Cargo.toml index bd5926a..15e5ff7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.package] -version = "0.3.0" +version = "0.4.0" edition = "2024" [workspace.dependencies] diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index cea6d71..dd2e5f1 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -11,7 +11,7 @@ ws = ["tokio-tungstenite", "futures"] cli = ["clap", "ctrlc", "anstyle"] [[bin]] -name = "client-cli" +name = "client" path = "src/bin/mod.rs" required-features = ["cli"] @@ -29,6 +29,7 @@ anstyle = { version = "1.0", optional = true } tokio-tungstenite = { workspace = true, optional = true } futures = { workspace = true, optional = true } ipnetwork = "0.21.1" +console-subscriber = "0.4.1" # Crypto snow = "0.9" diff --git a/crates/client/src/bin/command/connect.rs b/crates/client/src/bin/command/connect.rs index b5db588..590cad1 100644 --- a/crates/client/src/bin/command/connect.rs +++ b/crates/client/src/bin/command/connect.rs @@ -1,11 +1,17 @@ use clap::Args; use std::path::PathBuf; use std::{process, thread}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use ctrlc::set_handler; +use tokio::sync::watch; use tracing::{debug, error, info}; +use tun_rs::AsyncDevice; +use client::network::RouteState; use client::runtime::error::RuntimeError; use client::runtime::Runtime; +use client::runtime::state::RuntimeState; use shared::connection_config::{ConnectionConfig, InterfaceConfig, RuntimeConfig}; use shared::network::find_available_ifname; @@ -42,7 +48,7 @@ impl ConnectCmd { None => unreachable!("config or key is required should protected by clap") } }; - + if config.runtime.is_none() { config.runtime = Some(RuntimeConfig::default()); } @@ -61,26 +67,55 @@ impl ConnectCmd { } } + let sock_addr = match config.general.host.parse() { + Ok(addr) => SocketAddr::new(addr, config.general.port), + Err(err) => { + eprintln!("failed to resolve host: {}", err); + process::exit(1); + } + }; + + let iface_config = config.interface.unwrap_or_default(); + let tun = match shared::tun::setup_tun( + iface_config.name.clone(), + iface_config.mtu, + false, + ).await { + Ok(tun) => Arc::new(tun), + Err(err) => { + eprintln!("failed to setup tun: {}", err); + process::exit(1); + } + }; + + let routes = match RouteState::new(sock_addr.ip(), iface_config.name).build() + { + Ok(routes) => Arc::new(routes), + Err(err) => { + eprintln!("failed to setup routes: {}", err); + process::exit(1); + } + }; + let routes_clone = routes.clone(); + let runtime = Runtime::new( - match config.general.host.parse() { - Ok(addr) => addr, - Err(err) => { - eprintln!("failed to resolve host: {}", err); - process::exit(1); - } - }, - config.general.port, + sock_addr, + tun.clone(), config.general.alg, config.credentials, - config.runtime.unwrap_or_default(), - config.interface.unwrap_or_default(), + config.runtime.unwrap_or_default() ); - let stop_tx = runtime.stop_tx.clone(); + let state_tx = runtime.state_tx.clone(); + + tokio::spawn(tun_service( + state_tx.clone(), + tun.clone() + )); set_handler(move || { println!("Ctrl-C received, stopping runtime..."); - match stop_tx.send(RuntimeError::StopSignal) { + match state_tx.send(RuntimeState::Error(RuntimeError::StopSignal)) { Ok(_) => { debug!("stop signal sent from Ctrl-C handler"); } @@ -88,6 +123,7 @@ impl ConnectCmd { debug!("stop signal not sent from Ctrl-C handler: {}", err); } } + routes.restore(); thread::sleep(Duration::from_secs(2)); process::exit(0); }).expect("error setting Ctrl-C handler"); @@ -98,9 +134,57 @@ impl ConnectCmd { info!("runtime stopped"); } _ => { + routes_clone.restore(); error!("{}", error); } } } } -} \ No newline at end of file +} + + +pub async fn tun_service( + state_tx: watch::Sender, + tun: Arc, +) { + let mut state_rx = state_tx.subscribe(); + loop { + match state_rx.changed().await { + Ok(_) => { + debug!("tun service execute"); + let state = state_rx.borrow().clone(); + match state { + RuntimeState::Connected((payload, _)) => { + match payload.ipaddr { + IpAddr::V4(addr) => { + if let Err(err) = tun.set_network_address(addr, 32, None) { + state_tx.send(RuntimeState::Error(RuntimeError::IO( + format!("failed to set ipv4 network address: {}", err) + ))).expect("state_tx channel broken in tun_service"); + break; + } + }, + IpAddr::V6(addr) => { + if let Err(err) = tun.add_address_v6(addr, 128) { + state_tx.send(RuntimeState::Error(RuntimeError::IO( + format!("failed to add ipv6 network address: {}", err) + ))).expect("state_tx channel broken in tun_service"); + break; + } + } + } + }, + RuntimeState::Error(_) => { + debug!("tun service closed by global error state"); + break; + }, + _ => {} + } + } + Err(err) => { + debug!("state_tx channel error in tun service: {}", err); + break; + } + } + } +} diff --git a/crates/client/src/bin/mod.rs b/crates/client/src/bin/mod.rs index 4097989..3deb48f 100644 --- a/crates/client/src/bin/mod.rs +++ b/crates/client/src/bin/mod.rs @@ -12,7 +12,8 @@ const LOG_PREFIX: &str = "client.log"; async fn main() { let opt = opt::Opt::parse(); opt.init_logging(); - + // console_subscriber::init(); + match opt.cmd { Commands::Connect(cmd) => cmd.exec().await, } diff --git a/crates/client/src/bin/opt.rs b/crates/client/src/bin/opt.rs index 36718cd..c4d9888 100644 --- a/crates/client/src/bin/opt.rs +++ b/crates/client/src/bin/opt.rs @@ -25,19 +25,19 @@ impl Opt { } else { tracing::Level::INFO }); - + let file_appender = tracing_appender::rolling::daily(LOG_DIR, LOG_PREFIX); let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); - + let file_layer = fmt::layer() .with_writer(non_blocking) .with_ansi(false) .with_filter(log_level); - + let console_layer = fmt::layer() .with_ansi(std::io::stdout().is_terminal()) .with_filter(log_level); - + tracing_subscriber::registry() .with(file_layer) .with(console_layer) diff --git a/crates/client/src/network.rs b/crates/client/src/network.rs index 31ca48f..a395cab 100644 --- a/crates/client/src/network.rs +++ b/crates/client/src/network.rs @@ -1,5 +1,5 @@ use std::net::{IpAddr}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use std::process::Command; use anyhow::format_err; use std::fmt::Write; @@ -13,7 +13,7 @@ pub struct RouteState { use ipnetwork::{IpNetwork, NetworkSize}; impl RouteState { - pub fn new(remote: IpAddr, dev: String) -> Self { + pub fn new(remote: IpAddr, dev: String) -> Self { Self { dev, default_gateway: None, @@ -30,7 +30,7 @@ impl RouteState { format_err!("failed to get default device: {}", e) )?; self.default_gateway = Some(default_gateway); - info!("default gateway: {} from dev {}", default_gateway, default_dev_name); + debug!("default gateway: {} from dev {}", default_gateway, default_dev_name); add_route( &IpNetwork::from_str("0.0.0.0/1")?, None, @@ -56,7 +56,7 @@ impl RouteState { Ok(self) } - pub fn restore(&mut self) { + pub fn restore(&self) { for addr in self.exclude.iter() { match delete_route( addr, diff --git a/crates/client/src/runtime/error.rs b/crates/client/src/runtime/error.rs index 28de886..80b0977 100644 --- a/crates/client/src/runtime/error.rs +++ b/crates/client/src/runtime/error.rs @@ -5,14 +5,8 @@ use thiserror::Error; pub enum RuntimeError { #[error("IO: {0}")] IO(String), - #[error("Tun: {0}")] - Tun(String), - #[error("Network: {0}")] - Network(String), #[error("Handshake: {0}")] Handshake(String), - #[error("Disconnect: {0}")] - Disconnect(String), #[error("Unexpected: {0}")] Unexpected(String), #[error("StopSignal")] diff --git a/crates/client/src/runtime/worker/handshake.rs b/crates/client/src/runtime/handshake.rs similarity index 91% rename from crates/client/src/runtime/worker/handshake.rs rename to crates/client/src/runtime/handshake.rs index e741512..cb5c3f9 100644 --- a/crates/client/src/runtime/worker/handshake.rs +++ b/crates/client/src/runtime/handshake.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; use snow::{Builder, HandshakeState, StatelessTransportState}; +use tokio::select; use tracing::warn; use shared::connection_config::CredentialsConfig; use shared::handshake::{ @@ -8,21 +9,21 @@ use shared::handshake::{ NOISE_IK_PSK2_25519_AESGCM_BLAKE2S }; use shared::protocol::{ - EncryptedHandshake, - HandshakeError, - HandshakeResponderBody, - HandshakeResponderPayload, + EncryptedHandshake, + HandshakeError, + HandshakeResponderBody, + HandshakeResponderPayload, Packet }; use shared::session::Alg; -use crate::runtime::transport::Transport; -use super::super::{ - error::RuntimeError +use super::{ + error::RuntimeError, + transport::Transport }; fn initial( - alg: Alg, + alg: Alg, cred: &CredentialsConfig ) -> Result<(EncryptedHandshake, HandshakeState), RuntimeError> { let mut initiator = Builder::new(match alg { @@ -40,7 +41,7 @@ fn initial( } fn complete( - handshake: &EncryptedHandshake, + handshake: &EncryptedHandshake, mut initiator: HandshakeState ) -> Result<(HandshakeResponderBody, StatelessTransportState), RuntimeError> { let mut buffer = [0u8; 65536]; @@ -53,7 +54,7 @@ fn complete( } } -pub(super) async fn handshake_step( +pub async fn handshake_step( transport: Arc, cred: CredentialsConfig, alg: Alg, @@ -64,12 +65,12 @@ pub(super) async fn handshake_step( alg, &cred )?; - + transport.send(&Packet::HandshakeInitial(handshake).to_bytes()).await?; // [step 2] Server complete let mut buffer = [0u8; 65536]; - let resp = tokio::select! { + let resp = select! { _ = tokio::time::sleep(timeout) => Err(RuntimeError::Handshake( format!("server timeout ({:?})", timeout) )), @@ -107,4 +108,4 @@ pub(super) async fn handshake_step( } } } -} +} \ No newline at end of file diff --git a/crates/client/src/runtime/mod.rs b/crates/client/src/runtime/mod.rs index b54a556..ab359dd 100644 --- a/crates/client/src/runtime/mod.rs +++ b/crates/client/src/runtime/mod.rs @@ -1,75 +1,85 @@ pub mod error; mod worker; mod transport; +pub mod state; +mod handshake; -use std::net::{IpAddr, SocketAddr}; -use std::time::Duration; +use std::net::SocketAddr; +use std::ops::Deref; +use std::sync::Arc; use self::{ error::RuntimeError, }; -use tokio::sync::broadcast; +use tokio::sync::watch; use tracing::debug; +use tun_rs::AsyncDevice; use shared::session::Alg; -use shared::connection_config::{CredentialsConfig, InterfaceConfig, RuntimeConfig}; +use shared::connection_config::{CredentialsConfig, RuntimeConfig}; +use crate::runtime::state::RuntimeState; pub struct Runtime { sock: SocketAddr, alg: Alg, cred: CredentialsConfig, config: RuntimeConfig, - iface_config: InterfaceConfig, - pub stop_tx: broadcast::Sender + tun: Arc, + pub state_tx: watch::Sender } impl Runtime { pub fn new( - addr: IpAddr, - port: u16, + sock: SocketAddr, + tun: Arc, alg: Alg, cred: CredentialsConfig, config: RuntimeConfig, - iface_config: InterfaceConfig, ) -> Self { - let (stop_tx, _) = broadcast::channel::(10); + let (tx, _) = watch::channel(RuntimeState::Connecting); Self { - sock: SocketAddr::new(addr, port), + sock, + tun, alg, cred, config, - iface_config, - stop_tx + state_tx: tx, } } pub async fn run(&self) -> Result<(), RuntimeError> { let worker = worker::create( self.sock, - self.stop_tx.clone(), + self.tun.clone(), + self.state_tx.clone(), self.cred.clone(), self.alg.clone(), self.config.clone(), - self.iface_config.clone(), ); - - let mut stop_rx = self.stop_tx.subscribe(); + + let mut state_rx = self.state_tx.subscribe(); tokio::select! { resp = worker => match resp { Ok(_) => { - debug!("worker stopped without error, waiting for stop signal"); - tokio::time::sleep(Duration::from_secs(2)).await; - Ok(()) + debug!("worker stopped without error, try get err from state_tx"); + match self.state_tx.borrow().deref() { + RuntimeState::Error(err) => Err(err.clone()), + _ => unreachable!("program closed without error") + } }, Err(err) => { debug!("worker result with error"); Err(err) } }, - err = stop_rx.recv() => match err { - Ok(err) => Err(err), + state = state_rx.wait_for(|val| matches!(val, RuntimeState::Error(_))) => match state { + Ok(state) => match state.deref() { + RuntimeState::Error(err) => Err(err.clone()), + _ => unreachable!("expected RuntimeState::Error(_), got {err:?}", err = state) + }, Err(err) => { - Err(RuntimeError::IO(format!("stop channel err: {err}"))) + debug!("state channel broken"); + Err(RuntimeError::IO(format!("state channel err: {err}"))) } } } diff --git a/crates/client/src/runtime/state.rs b/crates/client/src/runtime/state.rs new file mode 100644 index 0000000..3c3bae7 --- /dev/null +++ b/crates/client/src/runtime/state.rs @@ -0,0 +1,11 @@ +use std::sync::Arc; +use snow::StatelessTransportState; +use shared::protocol::HandshakeResponderPayload; +use crate::runtime::error::RuntimeError; + +#[derive(Debug, Clone)] +pub enum RuntimeState { + Connecting, + Connected((HandshakeResponderPayload, Arc)), + Error(RuntimeError) +} \ No newline at end of file diff --git a/crates/client/src/runtime/transport.rs b/crates/client/src/runtime/transport.rs index 350eb9e..b17febf 100644 --- a/crates/client/src/runtime/transport.rs +++ b/crates/client/src/runtime/transport.rs @@ -16,4 +16,7 @@ pub trait TransportReceiver: Send + Sync { async fn recv(&self, buffer: &mut [u8]) -> io::Result; } -pub trait Transport: TransportSender + TransportReceiver{} +#[async_trait] +pub trait Transport: TransportSender + TransportReceiver + Send + Sync { + async fn connect(&self) -> io::Result<()>; +} diff --git a/crates/client/src/runtime/transport/udp.rs b/crates/client/src/runtime/transport/udp.rs index aedf940..76f7392 100644 --- a/crates/client/src/runtime/transport/udp.rs +++ b/crates/client/src/runtime/transport/udp.rs @@ -3,7 +3,10 @@ use crate::runtime::transport::{Transport, TransportReceiver, TransportSender}; use async_trait::async_trait; use socket2::{Domain, Protocol, Socket, Type}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; use tokio::net::UdpSocket; +use tracing::info; + pub struct UdpTransport { socket: UdpSocket @@ -15,7 +18,6 @@ impl UdpTransport { so_rcvbuf: usize, so_sndbuf: usize, ) -> Result { - tracing::info!("Connecting to udp://{}", addr); let socket = Socket::new( Domain::for_address(addr), Type::DGRAM, @@ -48,4 +50,13 @@ impl TransportSender for UdpTransport { } } -impl Transport for UdpTransport{} +#[async_trait] +impl Transport for UdpTransport { + async fn connect(&self) -> std::io::Result<()> { + info!("connecting to udp://{}", self.socket.peer_addr()?); + tokio::select! { + _ = self.socket.connect(self.socket.peer_addr()?) => Ok(()), + _ = tokio::time::sleep(Duration::from_secs(5)) => Err(std::io::Error::other("connection timeout")) + } + } +} diff --git a/crates/client/src/runtime/transport/ws.rs b/crates/client/src/runtime/transport/ws.rs index 2bfd3ac..2fc5656 100644 --- a/crates/client/src/runtime/transport/ws.rs +++ b/crates/client/src/runtime/transport/ws.rs @@ -1,35 +1,28 @@ -use crate::runtime::error::RuntimeError; +use std::io; use crate::runtime::transport::{Transport, TransportReceiver, TransportSender}; use async_trait::async_trait; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use std::net::SocketAddr; use std::sync::Arc; +use anyhow::anyhow; use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tracing::info; pub struct WsTransport { - write: Arc>, Message>>>, - read: Arc>>>>, + addr: SocketAddr, + write: Arc>, Message>>>>, + read: Arc>>>>>, } impl WsTransport { - pub async fn connect(addr: SocketAddr) -> Result { - tracing::info!("connecting to ws://{}", addr); - let request = format!("ws://{addr}").into_client_request().unwrap(); - let (ws_stream, _) = connect_async(request) - .await - .map_err(|e| RuntimeError::IO(format!( - "Failed to connect to WebSocket server: {}", e - )))?; - - let (write, read) = ws_stream.split(); - - Ok(Self { write: Arc::new(Mutex::new(write)) , read: Arc::new(Mutex::new(read)) }) + pub fn new(addr: SocketAddr) -> Self { + Self {addr, write: Arc::new(Mutex::new(None)) , read: Arc::new(Mutex::new(None)) } } } @@ -37,19 +30,27 @@ impl WsTransport { impl TransportReceiver for WsTransport { #[inline(always)] - async fn recv(&self, buffer: &mut [u8]) -> std::io::Result { - let mut read = self.read.lock().await; - while let Some(Ok(msg)) = read.next().await { - if let Message::Binary(data) = msg { - let len = data.len().min(buffer.len()); - buffer[..len].copy_from_slice(&data[..len]); - return Ok(len); - } + async fn recv(&self, buffer: &mut [u8]) -> io::Result { + match self.read.lock().await.as_mut() { + Some(read) => { + while let Some(Ok(msg)) = read.next().await { + if let Message::Binary(data) = msg { + let len = data.len().min(buffer.len()); + buffer[..len].copy_from_slice(&data[..len]); + return Ok(len); + } + } + + Err(io::Error::new( + io::ErrorKind::ConnectionAborted, + "WebSocket connection closed" + )) + }, + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "WebSocket connection not established" + )) } - Err(std::io::Error::new( - std::io::ErrorKind::ConnectionAborted, - "WebSocket connection closed" - )) } } @@ -57,16 +58,49 @@ impl TransportReceiver for WsTransport { impl TransportSender for WsTransport { #[inline(always)] - async fn send(&self, data: &[u8]) -> std::io::Result { - self.write.lock().await - .send(Message::Binary(data.to_vec().into())) - .await - .map(|_| data.len()) - .map_err(|e| std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - e.to_string() + async fn send(&self, data: &[u8]) -> io::Result { + match self.write.lock().await.as_mut() { + Some(write) => write + .send(Message::Binary(data.to_vec().into())) + .await + .map(|_| data.len()) + .map_err(|e| io::Error::new( + io::ErrorKind::BrokenPipe, + e.to_string() + )), + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "WebSocket connection not established" )) + } } } -impl Transport for WsTransport{} +#[async_trait] +impl Transport for WsTransport{ + async fn connect(&self) -> io::Result<()> { + info!("connecting to ws://{}", self.addr); + let request = format!("ws://{}", self.addr).into_client_request().map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + anyhow!("failed to create WebSocket request: {}", e) + ) + })?; + + let (ws_stream, _) = connect_async(request) + .await + .map_err(|e| io::Error::new( + io::ErrorKind::Other, + anyhow!("failed to connect to WebSocket server: {}", e) + ))?; + + let (write, read) = ws_stream.split(); + + let mut write_lock = self.write.lock().await; + *write_lock = Some(write); + let mut read_lock = self.read.lock().await; + *read_lock = Some(read); + + Ok(()) + } +} diff --git a/crates/client/src/runtime/worker/connector.rs b/crates/client/src/runtime/worker/connector.rs new file mode 100644 index 0000000..017e98c --- /dev/null +++ b/crates/client/src/runtime/worker/connector.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; +use tracing::{debug, error}; +use shared::connection_config::CredentialsConfig; +use shared::session::Alg; +use crate::runtime::handshake::handshake_step; +use crate::runtime::state::RuntimeState; +use crate::runtime::transport::Transport; +use super::super::{ + error::RuntimeError +}; + + +const RECONNECT_DELAY: Duration = Duration::from_secs(3); + +pub(crate) async fn executor( + transport: Arc, + state_tx: watch::Sender, + // for handshake step: + cred: CredentialsConfig, + alg: Alg, + timeout: Duration +) { + let mut state_rx = state_tx.subscribe(); + state_rx.mark_changed(); + let mut ticker = tokio::time::interval(RECONNECT_DELAY); + let mut is_reconnect = false; + + loop { + match state_rx.changed().await { + Ok(_) => { + let state = state_rx.borrow().clone(); + match state { + RuntimeState::Connecting => match transport.connect().await { + Ok(_) => match handshake_step( + transport.clone(), + cred.clone(), + alg.clone(), + timeout + ).await { + Ok((payload, transport_state)) => { + is_reconnect = true; + state_tx.send(RuntimeState::Connected((payload, Arc::new(transport_state)))).expect( + "broken runtime state pipe" + ); + continue + }, + Err(err) => match is_reconnect { + false => { + state_tx.send(RuntimeState::Error(err)).expect( + "broken runtime state pipe" + ); + return; + }, + true => { + error!("{}, trying again in {:?}", err, RECONNECT_DELAY); + state_rx.mark_changed(); + ticker.tick().await; + } + } + }, + Err(err) => match is_reconnect { + false => { + state_tx.send(RuntimeState::Error( + RuntimeError::IO(format!("connecting error: {}", err)) + )).expect( + "broken runtime state pipe" + ); + return; + }, + true => { + error!("failed to reconnect: {}, trying again in {:?}", err, RECONNECT_DELAY); + ticker.tick().await; + } + } + }, + RuntimeState::Error(_) => { + debug!("handshake executor stopped by error state"); + break; + }, + _ => {} + } + }, + Err(err) => { + debug!("state_rx channel error in handshake executor: {}", err); + break; + } + } + } +} diff --git a/crates/client/src/runtime/worker/data.rs b/crates/client/src/runtime/worker/data.rs index 7b81cad..f0bc25d 100644 --- a/crates/client/src/runtime/worker/data.rs +++ b/crates/client/src/runtime/worker/data.rs @@ -1,14 +1,14 @@ +use std::ops::Deref; use crate::runtime::error::RuntimeError; use shared::protocol::{DataClientBody, DataServerBody, EncryptedData, Packet}; use shared::time::{format_duration_millis, micros_since_start}; use shared::session::SessionId; use snow::StatelessTransportState; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::watch::Sender; use tokio::sync::mpsc; use tracing::{info, warn}; - +use crate::runtime::state::RuntimeState; fn decrypt_body( encrypted: &EncryptedData, @@ -48,17 +48,28 @@ fn encrypt_body( } pub(super) async fn data_udp_executor( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, mut queue: mpsc::Receiver, tun_sender: mpsc::Sender>, - state: Arc, ) { + let mut state_rx = state_tx.subscribe(); + let mut state = None; + loop { tokio::select! { - _ = stop.recv() => break, - data = queue.recv() => match data { // todo: may exec in another thread from pool - Some(data) => match decrypt_body(&data, &state) { + _ = state_rx.changed() => { + match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + RuntimeState::Connecting => { + continue + }, + RuntimeState::Connected((_, transport_state)) => { + state = Some(transport_state.clone()); + } + } + }, + data = queue.recv() => match data { + Some(data) => match decrypt_body(&data, state.as_deref().unwrap()) { Ok(data_body) => match data_body { DataServerBody::KeepAlive(time) => { info!("keepalive rtt: {}", format_duration_millis( @@ -68,9 +79,8 @@ pub(super) async fn data_udp_executor( continue; }, DataServerBody::Disconnect(ref code) => { - stop_sender.send(RuntimeError::Disconnect( - format!("server disconnected code {}", code) - )).unwrap(); + warn!("got server disconnected code {}", code); + state_tx.send(RuntimeState::Connecting).unwrap(); continue; }, DataServerBody::Packet(payload) => { @@ -89,25 +99,37 @@ pub(super) async fn data_udp_executor( } pub(super) async fn data_tun_executor( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, mut queue: mpsc::Receiver>, udp_sender: mpsc::Sender, - state: Arc, - sid: SessionId ) { + let mut state_rx = state_tx.subscribe(); + let mut sid = SessionId::default(); + let mut state = None; + loop { tokio::select! { - _ = stop.recv() => break, - body = queue.recv() => match body { // todo: may exec in another thread from pool?? - Some(packet) => match encrypt_body(&DataClientBody::Packet(packet.into()), &state) { + _ = state_rx.changed() => { + match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + RuntimeState::Connecting => { + continue + }, + RuntimeState::Connected((payload, transport_state)) => { + sid = payload.sid; + state = Some(transport_state.clone()); + } + } + }, + body = queue.recv() => match body { + Some(packet) => match encrypt_body(&DataClientBody::Packet(packet.into()), state.as_deref().unwrap()) { Ok(encrypted) => { udp_sender.send(Packet::DataClient{ sid, encrypted }).await.unwrap(); // todo remove await }, Err(e) => { - stop_sender.send(RuntimeError::Unexpected( + state_tx.send(RuntimeState::Error(RuntimeError::Unexpected( format!("failed to encrypt data: {}", e) - )).unwrap(); + ))).expect("broken runtime state pipe in data_tun_executor"); } }, None => return @@ -117,27 +139,61 @@ pub(super) async fn data_tun_executor( } pub(super) async fn keepalive_sender( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, udp_sender: mpsc::Sender, - duration: Duration, - state: Arc, - sid: SessionId + duration: Duration ) { - let mut timer = tokio::time::interval(duration); + let mut keepalive_timer = tokio::time::interval(duration); + let mut state_wait_timer = tokio::time::interval(Duration::from_secs(1)); + + let mut state_rx = state_tx.subscribe(); + let mut sid = SessionId::default(); + let mut state = None; + let mut is_connected = false; loop { + match state_rx.has_changed() { + Ok(has_changed) => if has_changed { + state_rx.mark_unchanged(); + match state_rx.borrow().deref() { + RuntimeState::Error(_) => { + break + }, + RuntimeState::Connecting => { + is_connected = false; + }, + RuntimeState::Connected((payload, transport_state)) => { + sid = payload.sid; + state = Some(transport_state.clone()); + is_connected = true; + } + } + }, + Err(err) => { + warn!("state channel broken: {}", err); + break; + } + } + + if !is_connected { + state_wait_timer.tick().await; + continue; + } + tokio::select! { - _ = stop.recv() => break, - _ = timer.tick() => match encrypt_body(&DataClientBody::KeepAlive(micros_since_start()), &state) { + _ = state_rx.changed() => { + state_rx.mark_changed(); + continue + }, + _ = keepalive_timer.tick() => match encrypt_body(&DataClientBody::KeepAlive(micros_since_start()), state.as_deref().unwrap()) { Ok(encrypted) => { udp_sender.send(Packet::DataClient{ sid, encrypted }).await.unwrap(); // todo: if channel is full then we can ignore sending }, Err(e) => { - stop_sender.send(RuntimeError::Unexpected( + state_tx.send(RuntimeState::Error(RuntimeError::Unexpected( format!("failed to encrypt data: {}", e) - )).unwrap(); + ))).unwrap(); } - } + }, } } } diff --git a/crates/client/src/runtime/worker/mod.rs b/crates/client/src/runtime/worker/mod.rs index 14ac64e..689e1a1 100644 --- a/crates/client/src/runtime/worker/mod.rs +++ b/crates/client/src/runtime/worker/mod.rs @@ -1,4 +1,4 @@ -mod handshake; +mod connector; mod data; mod tun; mod transport; @@ -10,44 +10,41 @@ pub use crate::runtime::transport::udp::UdpTransport; pub use crate::runtime::transport::ws::WsTransport; use crate::{ - network::RouteState, runtime::{ error::RuntimeError, worker::{ data::{data_tun_executor, data_udp_executor, keepalive_sender}, - handshake::handshake_step, transport::{transport_listener, transport_sender}, tun::{tun_listener, tun_sender}, } }, }; -use shared::connection_config::{CredentialsConfig, InterfaceConfig, RuntimeConfig}; +use shared::connection_config::{CredentialsConfig, RuntimeConfig}; use shared::protocol::{EncryptedData, Packet}; use shared::session::Alg; -use shared::tun::setup_tun; use std::time::Duration; use std::{ net::SocketAddr, sync::Arc }; -use tokio::sync::broadcast::{Sender}; -use tokio::sync::mpsc; -use tracing::{info}; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, warn}; +use tun_rs::AsyncDevice; +use crate::runtime::state::RuntimeState; use crate::runtime::transport::Transport; pub(crate) async fn create( addr: SocketAddr, - stop_tx: Sender, + tun: Arc, + state_tx: watch::Sender, cred: CredentialsConfig, alg: Alg, runtime_config: RuntimeConfig, - iface_config: InterfaceConfig, ) -> Result<(), RuntimeError> { let transport: Arc = match () { #[cfg(feature = "udp")] _ if cfg!(feature = "udp") => { - info!("using UDP transport"); Arc::new(UdpTransport::new( addr, runtime_config.so_rcvbuf, @@ -56,8 +53,7 @@ pub(crate) async fn create( } #[cfg(feature = "ws")] _ if cfg!(feature = "ws") => { - info!("using WebSocket transport"); - Arc::new(WsTransport::connect(addr).await?) + Arc::new(WsTransport::new(addr)) } _ => unreachable!("transport is not enabled, please enable transport features") }; @@ -66,70 +62,38 @@ pub(crate) async fn create( let (tun_sender_tx, tun_sender_rx) = mpsc::channel::>(runtime_config.out_tun_buf); let (data_udp_tx, data_udp_rx) = mpsc::channel::(runtime_config.data_udp_buf); let (data_tun_tx, data_tun_rx) = mpsc::channel::>(runtime_config.data_tun_buf); - - // Handshake step - let (handshake_payload, state) = match tokio::spawn(handshake_step( - transport.clone(), - cred, - alg, - Duration::from_millis(runtime_config.handshake_timeout) - )).await.unwrap() { // todo unwrap - Ok((p, state)) => (p, Arc::new(state)), - Err(err) => { - stop_tx.send(err.clone())?; - return Err(err); - } - }; // Handle incoming UDP packets - tokio::spawn(transport_listener(stop_tx.clone(), stop_tx.subscribe(), transport.clone(), data_udp_tx)); - + tokio::spawn(transport_listener(state_tx.clone(), transport.clone(), data_udp_tx)); + // Handle outgoing UDP packets - tokio::spawn(transport_sender(stop_tx.clone(), stop_tx.subscribe(), transport.clone(), udp_sender_rx)); - - + tokio::spawn(transport_sender(state_tx.clone(), transport.clone(), udp_sender_rx)); + + // Executors tokio::spawn(data_tun_executor( - stop_tx.clone(), - stop_tx.subscribe(), + state_tx.clone(), data_tun_rx, udp_sender_tx.clone(), - state.clone(), - handshake_payload.sid, )); tokio::spawn(data_udp_executor( - stop_tx.clone(), - stop_tx.subscribe(), + state_tx.clone(), data_udp_rx, - tun_sender_tx, - state.clone() + tun_sender_tx )); - let tun = Arc::new(setup_tun( - iface_config.name.clone(), - iface_config.mtu, - handshake_payload.ipaddr, - 32, - false - ).await?); - - // move from runtime - let mut routes = RouteState::new(addr.ip(), iface_config.name) - .build()?; // Handle incoming TUN packets tokio::spawn(tun_listener( - stop_tx.clone(), - stop_tx.subscribe(), + state_tx.clone(), tun.clone(), data_tun_tx )); - + // Handle outgoing TUN packets tokio::spawn(tun_sender( - stop_tx.clone(), - stop_tx.subscribe(), + state_tx.clone(), tun.clone(), tun_sender_rx )); @@ -137,26 +101,24 @@ pub(crate) async fn create( match runtime_config.keepalive { Some(duration) => { - info!("starting keepalive transport with interval {:?}", duration); + debug!("starting keepalive with interval {:?}", duration); tokio::spawn(keepalive_sender( - stop_tx.clone(), - stop_tx.subscribe(), + state_tx.clone(), udp_sender_tx, Duration::from_secs(duration), - state.clone(), - handshake_payload.sid, )); }, - None => info!("keepalive transport is disabled") - } - - let mut stop_rx = stop_tx.subscribe(); - tokio::select! { - _ = stop_rx.recv() => { - routes.restore(); - info!("listener stopped") - } + None => warn!("keepalive is disabled") } + // handshake_executor + connector::executor( + transport.clone(), + state_tx.clone(), + cred, + alg, + Duration::from_millis(runtime_config.handshake_timeout) + ).await; + Ok(()) } diff --git a/crates/client/src/runtime/worker/transport.rs b/crates/client/src/runtime/worker/transport.rs index cb63fc8..cfa11e4 100644 --- a/crates/client/src/runtime/worker/transport.rs +++ b/crates/client/src/runtime/worker/transport.rs @@ -1,25 +1,44 @@ +use std::ops::Deref; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver, Sender}; +use std::time::Duration; +use tokio::sync::watch::Sender; use tokio::sync::mpsc; use tracing::{debug, error, warn}; use shared::protocol::{EncryptedData, Packet}; -use crate::runtime::error::RuntimeError; +use crate::runtime::state::RuntimeState; use crate::runtime::transport::{TransportReceiver, TransportSender}; pub async fn transport_sender( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, transport: Arc, mut queue: mpsc::Receiver ) { + let mut state_wait_timer = tokio::time::interval(Duration::from_secs(1)); + + let mut state_rx = state_tx.subscribe(); + let mut is_connected = false; + loop { + if !is_connected && !state_rx.has_changed().unwrap() { + state_wait_timer.tick().await; + continue; + } + tokio::select! { - _ = stop.recv() => break, + _ = state_rx.changed() => match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + RuntimeState::Connecting => { + is_connected = false; + }, + RuntimeState::Connected(_) => { + is_connected = true; + } + }, result = queue.recv() => match result { Some(packet) => match transport.send(&packet.to_bytes()).await { Ok(n) => debug!("sent transport packet with {} bytes", n), - Err(err) => { - stop_sender.send(RuntimeError::IO(format!("failed to send udp: {}", err))).unwrap(); + Err(_) => { + state_tx.send(RuntimeState::Connecting).unwrap(); // todo log } }, None => break @@ -29,15 +48,31 @@ pub async fn transport_sender( } pub async fn transport_listener( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, transport: Arc, data_receiver: mpsc::Sender ) { + let mut state_wait_timer = tokio::time::interval(Duration::from_secs(1)); + + let mut state_rx = state_tx.subscribe(); + let mut is_connected = false; let mut transport_buffer = [0u8; 65536]; loop { + if !is_connected && !state_rx.has_changed().unwrap() { + state_wait_timer.tick().await; + continue; + } + tokio::select! { - _ = stop.recv() => break, + _ = state_rx.changed() => match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + RuntimeState::Connecting => { + is_connected = false; + }, + RuntimeState::Connected(_) => { + is_connected = true; + } + }, result = transport.recv(&mut transport_buffer) => match result { Ok(n) => { debug!("received transport packet with {} bytes", n); @@ -71,9 +106,7 @@ pub async fn transport_listener( } } } - Err(err) => { - stop_sender.send(RuntimeError::IO(format!("failed to receive transport: {}", err))).unwrap(); - } + Err(_) => state_tx.send(RuntimeState::Connecting).unwrap() // todo log } } } diff --git a/crates/client/src/runtime/worker/tun.rs b/crates/client/src/runtime/worker/tun.rs index 16d3e79..7a022eb 100644 --- a/crates/client/src/runtime/worker/tun.rs +++ b/crates/client/src/runtime/worker/tun.rs @@ -1,23 +1,33 @@ +use std::ops::Deref; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver, Sender}; +use std::time::Duration; +use tokio::sync::watch::Sender; use tokio::sync::mpsc; use tracing::{error, warn}; use tun_rs::AsyncDevice; use crate::runtime::error::RuntimeError; +use crate::runtime::state::RuntimeState; pub async fn tun_sender( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, tun: Arc, mut queue: mpsc::Receiver> ) { + let mut state_rx = state_tx.subscribe(); loop { tokio::select! { - _ = stop.recv() => break, + _ = state_rx.changed() => { + match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + _ => continue + } + }, result = queue.recv() => match result { Some(packet) => { if let Err(err) = tun.send(&packet).await { - stop_sender.send(RuntimeError::IO(format!("failed to send tun: {}", err))).unwrap(); + state_tx.send(RuntimeState::Error( + RuntimeError::IO(format!("failed to send tun: {}", err)) + )).unwrap(); } }, None => break @@ -27,15 +37,33 @@ pub async fn tun_sender( } pub async fn tun_listener( - stop_sender: Sender, - mut stop: Receiver, + state_tx: Sender, tun: Arc, queue: mpsc::Sender> ) { + let mut state_wait_timer = tokio::time::interval(Duration::from_secs(1)); + + let mut state_rx = state_tx.subscribe(); + let mut is_connected = false; let mut buffer = [0u8; 65536]; loop { + if !is_connected && !state_rx.has_changed().unwrap() { + state_wait_timer.tick().await; + continue; + } + tokio::select! { - _ = stop.recv() => break, + _ = state_rx.changed() => { + match state_rx.borrow().deref() { + RuntimeState::Error(_) => break, + RuntimeState::Connecting => { + is_connected = false; + }, + RuntimeState::Connected(_) => { + is_connected = true; + } + } + }, result = tun.recv(&mut buffer) => match result { Ok(n) => { if n == 0 { @@ -51,7 +79,9 @@ pub async fn tun_listener( } } Err(err) => { - stop_sender.send(RuntimeError::IO(format!("failed to receive tun: {}",err))).unwrap(); + state_tx.send(RuntimeState::Error( + RuntimeError::IO(format!("failed to receive tun: {}",err)) + )).unwrap(); } } } diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e3d13d2..6cc5605 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -8,7 +8,7 @@ autobins = false default = ["cli"] udp = ["socket2"] ws = ["tokio-tungstenite", "futures", "socket2"] -cli = ["clap", "inquire", "anstyle", "ctrlc", "rocksdb"] +cli = ["clap", "inquire", "anstyle", "ctrlc", "fjall"] [[bin]] name = "server" @@ -27,10 +27,10 @@ chrono = { workspace = true } etherparse = { workspace = true } serde = { workspace = true } anyhow = { workspace = true } -thiserror = "1.0" +thiserror = "2.0.12" dashmap = "7.0.0-rc2" bincode = { workspace = true } -derive_more = { version = "1.0", features = ["display"] } +derive_more = { version = "2.0.1", features = ["display"] } toml = "0.8.19" # logging @@ -42,7 +42,7 @@ tracing-appender = { workspace = true } snow = { workspace = true } # IO -rocksdb = { version = "0.23", features = ["multi-threaded-cf"], optional = true } +fjall = { version = "2.10.0", optional = true } tun-rs = { workspace = true } tokio = { workspace = true } socket2 = { workspace = true, optional = true } diff --git a/crates/server/src/bin/command/start.rs b/crates/server/src/bin/command/start.rs index a8c0850..a5bbc78 100644 --- a/crates/server/src/bin/command/start.rs +++ b/crates/server/src/bin/command/start.rs @@ -38,7 +38,13 @@ impl StartCmd { } let clients = match database(&config.general.storage) { - Ok(db) => Clients::new(db), + Ok(db) => match Clients::new(db) { + Ok(store) => store, + Err(err) => { + error!("failed to create client storage: {}", err); + process::exit(1); + } + }, Err(err) => { error!("load storage: {}", err); process::exit(1); diff --git a/crates/server/src/bin/command/users/add.rs b/crates/server/src/bin/command/users/add.rs index 44e9344..d3f9611 100644 --- a/crates/server/src/bin/command/users/add.rs +++ b/crates/server/src/bin/command/users/add.rs @@ -68,7 +68,7 @@ impl AddCmd { println!("Private key {}", format_opaque_bytes(sk.as_slice())); println!("Pre-shared key {}", format_opaque_bytes(psk.as_slice())); - let clients = Clients::new(database(&config.general.storage)?); + let clients = Clients::new(database(&config.general.storage)?)?; clients.save(Client { psk: psk.clone(), peer_pk: PublicKey::derive_from(sk.clone()), diff --git a/crates/server/src/bin/command/users/list.rs b/crates/server/src/bin/command/users/list.rs index 4c53f9c..7308aeb 100644 --- a/crates/server/src/bin/command/users/list.rs +++ b/crates/server/src/bin/command/users/list.rs @@ -15,7 +15,7 @@ pub struct ListCmd; impl ListCmd { pub async fn exec(self, config: Config) -> anyhow::Result<()> { - let clients = Clients::new(database(&config.general.storage)?); + let clients = Clients::new(database(&config.general.storage)?)?; let users: Vec<_> = clients.get_all().await.iter().map(|client| { UserRow { pk: client.peer_pk.to_hex(), diff --git a/crates/server/src/bin/command/users/remove.rs b/crates/server/src/bin/command/users/remove.rs index a548b68..cac273e 100644 --- a/crates/server/src/bin/command/users/remove.rs +++ b/crates/server/src/bin/command/users/remove.rs @@ -16,7 +16,7 @@ impl RemoveCmd { anyhow::anyhow!("failed to parse public key: {}", error) })?; - let clients = Clients::new(database(&config.general.storage)?); + let clients = Clients::new(database(&config.general.storage)?)?; clients.delete(&pk).await?; println!("Client has been successfully removed"); Ok(()) diff --git a/crates/server/src/bin/storage/clients.rs b/crates/server/src/bin/storage/clients.rs index 8e10a70..1f2d61e 100644 --- a/crates/server/src/bin/storage/clients.rs +++ b/crates/server/src/bin/storage/clients.rs @@ -1,8 +1,7 @@ use chrono::{DateTime, Utc}; -use rocksdb::DB; +use fjall::{Keyspace, PartitionCreateOptions, PartitionHandle}; use serde::{Deserialize, Serialize}; use shared::keys::handshake::{PublicKey, SecretKey}; -use std::sync::Arc; use tokio::task; #[derive(Serialize, Deserialize)] @@ -15,16 +14,18 @@ pub struct Client { #[derive(Clone)] pub struct Clients { - pub db: Arc, + pub db: PartitionHandle } impl Clients { - pub fn new(db: DB) -> Self { - Self { db: Arc::new(db) } + pub fn new(db: Keyspace) -> anyhow::Result { + let items = db.open_partition("clients", PartitionCreateOptions::default())?; + + Ok(Self { db: items }) } pub async fn get(&self, pk: &PublicKey) -> Option { - let db = Arc::clone(&self.db); + let db = self.db.clone(); let pk = pk.clone(); // todo fix task::spawn_blocking(move || { @@ -41,10 +42,10 @@ impl Clients { } pub async fn get_all(&self) -> Vec { - let db = Arc::clone(&self.db); + let db = self.db.clone(); task::spawn_blocking(move || { - db.iterator(rocksdb::IteratorMode::Start).map(|result| match result { + db.iter().map(|result| match result { Ok((_, value)) => match bincode::serde::decode_from_slice( &value, bincode::config::standard() @@ -61,25 +62,25 @@ impl Clients { } pub async fn save(&self, client: Client) { - let db = Arc::clone(&self.db); + let db = self.db.clone(); let data = bincode::serde::encode_to_vec( &client, bincode::config::standard() ).expect("serialize client"); task::spawn_blocking(move || { - db.put(*client.peer_pk, &data).expect("save client to db"); + db.insert(*client.peer_pk, &data).expect("save client to db"); }) .await .unwrap() } pub async fn delete(&self, pk: &PublicKey) -> anyhow::Result<()> { - let db = Arc::clone(&self.db); + let db = self.db.clone(); let pk = pk.clone(); // todo fix task::spawn_blocking(move || { - db.delete(pk.as_slice()) + db.remove(pk.as_slice()) .map_err(anyhow::Error::from) }) .await? diff --git a/crates/server/src/bin/storage/mod.rs b/crates/server/src/bin/storage/mod.rs index 3a9f3f4..413485e 100644 --- a/crates/server/src/bin/storage/mod.rs +++ b/crates/server/src/bin/storage/mod.rs @@ -1,4 +1,5 @@ -use rocksdb::DB; +use fjall::{Keyspace, Config}; + use std::path::Path; mod clients; @@ -8,10 +9,6 @@ pub use clients::{ Clients }; -pub fn database(path: &Path) -> anyhow::Result { - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - DB::open(&opts, path).map_err(|error| { - anyhow::anyhow!("failed to open database: {}", error) - }) +pub fn database(path: &Path) -> anyhow::Result { + Ok(Config::new(path).open()?) } \ No newline at end of file diff --git a/crates/server/src/runtime/mod.rs b/crates/server/src/runtime/mod.rs index 0ae82ed..abbcd0f 100644 --- a/crates/server/src/runtime/mod.rs +++ b/crates/server/src/runtime/mod.rs @@ -86,10 +86,23 @@ impl Runtime { let tun = setup_tun( &self.tun_name, self.tun_mtu, - self.tun_ip, - self.tun_prefix, true ).await.map_err(|err| vec![RuntimeError::from(err)])?; + + match self.tun_ip { + IpAddr::V4(addr) => { + tun.set_network_address(addr, self.tun_prefix, None) + .map_err(|err| vec![RuntimeError::Tun( + format!("failed to set network address: {}", err) + )])?; + } + IpAddr::V6(addr) => { + tun.add_address_v6(addr, self.tun_prefix) + .map_err(|err| vec![RuntimeError::Tun( + format!("failed to set network address: {}", err) + )])?; + } + } let mut transports: Vec> = match () { #[cfg(feature = "udp")] @@ -197,4 +210,4 @@ impl Runtime { panic!("all workers stopped unexpectedly"); } -} \ No newline at end of file +} diff --git a/crates/shared/src/protocol/handshake.rs b/crates/shared/src/protocol/handshake.rs index b10a41b..e79d7cd 100644 --- a/crates/shared/src/protocol/handshake.rs +++ b/crates/shared/src/protocol/handshake.rs @@ -8,7 +8,7 @@ pub enum HandshakeResponderBody { Disconnect(HandshakeError) } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct HandshakeResponderPayload { pub sid: SessionId, pub ipaddr: IpAddr diff --git a/crates/shared/src/tun.rs b/crates/shared/src/tun.rs index 28b8635..10e066e 100644 --- a/crates/shared/src/tun.rs +++ b/crates/shared/src/tun.rs @@ -1,8 +1,7 @@ use std::io; -use std::net::IpAddr; use tun_rs::AsyncDevice; -pub async fn setup_tun>(name: S, mtu: u16, ip: IpAddr, prefix: u8, multiple: bool) -> io::Result { +pub async fn setup_tun>(name: S, mtu: u16, multiple: bool) -> io::Result { let mut config = tun_rs::DeviceBuilder::default() .name(name) .mtu(mtu) @@ -15,8 +14,5 @@ pub async fn setup_tun>(name: S, mtu: u16, ip: IpAddr, prefix: u config = config.packet_information(false); } - match ip { - IpAddr::V4(addr) => config.ipv4(addr, prefix, None), - IpAddr::V6(addr) => config.ipv6(addr, prefix), - }.build_async() + config.build_async() }