From fbc78adcc0e2f19058f9ce5d5e0976540e77ee3d Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 30 Aug 2023 10:33:31 +0200 Subject: [PATCH 1/9] add embedded-io-async support --- Cargo.toml | 42 ++++ examples/client_async_embedded_io_async.rs | 157 +++++++++++++ src/framer_async.rs | 258 +++++++++++++++++++++ 3 files changed, 457 insertions(+) create mode 100644 examples/client_async_embedded_io_async.rs diff --git a/Cargo.toml b/Cargo.toml index 86fac1d..ed72b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,15 +18,57 @@ httparse = { version = "1.8.0", default-features = false } rand_core = { version = "0.6.4", default-features = false } base64 = { version = "0.13.1", default-features = false } futures = { version = "0.3.28", default-features = false } +embedded-io-async = { version = "0.5.0", default-features = false, optional = true} [dev-dependencies] rand = "0.8.5" bytes = "1.4.0" tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.8", features = ["net", "codec"] } +clap = { version = "3.0.0-beta.5", features = ["derive"] } +log = "0.4.14" +env_logger = "0.9.0" +static_cell = { version = "1.1", features = ["nightly"]} + +[dependencies.embassy-executor] +git = "https://github.com/embassy-rs/embassy.git" +package = "embassy-executor" +version = "0.3.0" +# rev = "f8299d10f7c0387416989e00acc02d99661537fb" +features = ["arch-std", "executor-thread", "log", "nightly", "integrated-timers"] + +[dependencies.embassy-time] +git = "https://github.com/embassy-rs/embassy.git" +package = "embassy-time" +# rev = "a2c718f61c0587e19d10c5c86ff4cd79272e2b87" +features = ["log", "std", "nightly"] + +[dependencies.embassy-net] +git = "https://github.com/embassy-rs/embassy.git" +package = "embassy-net" +version = "0.1.0" +# rev = "b5748524f86f809d9c8dc2c5b4bb3f07e55dbda1" +features = ["std", "nightly", "log", "medium-ethernet", "medium-ip", "tcp", "udp", "dns", "dhcpv4", "proto-ipv6"] + +[dependencies.embassy-net-driver] +git = "https://github.com/embassy-rs/embassy.git" +package = "embassy-net-driver" +# rev = "83ff3cbc69875f93c5a9bb36825c12df39f04f71" + +[dev-dependencies.embassy-net-tuntap] +git = "https://github.com/embassy-rs/embassy.git" +version = "0.1.0" +package = "embassy-net-tuntap" +# rev = "4d60c715e683aaadf25d9f066bde805c725fefb4" # see readme for no_std support [features] default = ["std"] # default = [] std = [] +embedded-io-async = ["dep:embedded-io-async"] + +[[example]] +name = "client_async_embedded_io_async" +path = "examples/client_async_embedded_io_async.rs" +required-features = ["embedded-io-async"] diff --git a/examples/client_async_embedded_io_async.rs b/examples/client_async_embedded_io_async.rs new file mode 100644 index 0000000..bc94378 --- /dev/null +++ b/examples/client_async_embedded_io_async.rs @@ -0,0 +1,157 @@ +#![feature(type_alias_impl_trait)] + +use std::default::Default; + +use clap::Parser; +use embassy_executor::{Executor, Spawner}; +use embassy_net::tcp::TcpSocket; +use embassy_net::{Config, Ipv4Address, Ipv4Cidr, Stack, StackResources}; +use embassy_net_tuntap::TunTapDevice; +use embassy_time::Duration; +use embedded_websocket::framer_async::{Framer, ReadResult}; +use embedded_websocket::{ + WebSocketClient, WebSocketCloseStatusCode, WebSocketOptions, WebSocketSendMessageType, +}; +use heapless::Vec; +use log::*; +use rand_core::{OsRng, RngCore}; +use static_cell::{make_static, StaticCell}; + +#[derive(Parser)] +#[clap(version = "1.0")] +struct Opts { + /// TAP device name + #[clap(long, default_value = "tap0")] + tap: String, + /// use a static IP instead of DHCP + #[clap(long)] + static_ip: bool, +} + +#[embassy_executor::task] +async fn net_task(stack: &'static Stack) -> ! { + stack.run().await +} + +#[embassy_executor::task] +async fn main_task(spawner: Spawner) { + let opts: Opts = Opts::parse(); + + // Init network device + let device = TunTapDevice::new(&opts.tap).unwrap(); + + // Choose between dhcp or static ip + let config = if opts.static_ip { + Config::ipv4_static(embassy_net::StaticConfigV4 { + address: Ipv4Cidr::new(Ipv4Address::new(172, 0, 0, 1), 24), + dns_servers: Vec::new(), + gateway: Some(Ipv4Address::new(192, 168, 69, 1)), + }) + } else { + Config::dhcpv4(Default::default()) + }; + + // Generate random seed + let mut seed = [0; 8]; + OsRng.fill_bytes(&mut seed); + let seed = u64::from_le_bytes(seed); + + // Init network stack + let stack = &*make_static!(Stack::new( + device, + config, + make_static!(StackResources::<3>::new()), + seed + )); + + // Launch network task + spawner.spawn(net_task(stack)).unwrap(); + + // Then we can use it! + let mut rx_buffer = [0; 4096]; + let mut tx_buffer = [0; 4096]; + let mut ws_buffer = [0u8; 4000]; + let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); + + socket.set_timeout(Some(Duration::from_secs(10))); + + let remote_endpoint = (Ipv4Address::new(172, 0, 0, 1), 1337); + info!("connecting to {:?}...", remote_endpoint); + let r = socket.connect(remote_endpoint).await; + if let Err(e) = r { + warn!("connect error: {:?}", e); + return; + } + info!("connected!"); + + let websocket = WebSocketClient::new_client(rand::thread_rng()); + + // initiate a websocket opening handshake + let websocket_options = WebSocketOptions { + path: "/chat", + host: "localhost", + origin: "http://localhost:1337", + sub_protocols: None, + additional_headers: None, + }; + + let mut framer = Framer::new(websocket); + + framer + .connect(&mut socket, &mut ws_buffer, &websocket_options) + .await + .unwrap(); + + info!("ws handshake complete"); + + framer + .write( + &mut socket, + &mut ws_buffer, + WebSocketSendMessageType::Text, + true, + "Hello, world".as_bytes(), + ) + .await + .unwrap(); + + info!("sent message"); + + while let Some(read_result) = framer.read(&mut socket, &mut ws_buffer).await { + let read_result = read_result.unwrap(); + match read_result { + ReadResult::Text(text) => { + info!("received text: {text}"); + + framer + .close( + &mut socket, + &mut ws_buffer, + WebSocketCloseStatusCode::NormalClosure, + None, + ) + .await + .unwrap() + } + _ => { // ignore other kinds of messages + } + } + } + + info!("closed"); +} + +static EXECUTOR: StaticCell = StaticCell::new(); + +fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .filter_module("async_io", log::LevelFilter::Info) + .format_timestamp_nanos() + .init(); + + let executor = EXECUTOR.init(Executor::new()); + executor.run(|spawner| { + spawner.spawn(main_task(spawner)).unwrap(); + }); +} diff --git a/src/framer_async.rs b/src/framer_async.rs index 20a0063..498ec1f 100644 --- a/src/framer_async.rs +++ b/src/framer_async.rs @@ -1,5 +1,7 @@ use core::{fmt::Debug, ops::Deref, str::Utf8Error}; +#[cfg(feature = "embedded-io-async")] +use embedded_io_async::{ErrorType, Read, Write}; use futures::{Sink, SinkExt, Stream, StreamExt}; use rand_core::RngCore; @@ -50,6 +52,7 @@ where rx_remainder_len: usize, } +#[cfg(not(feature = "embedded-io-async"))] impl Framer where TRng: RngCore, @@ -111,6 +114,7 @@ where } } +#[cfg(not(feature = "embedded-io-async"))] impl Framer where TRng: RngCore, @@ -306,3 +310,257 @@ where None } } + +#[cfg(feature = "embedded-io-async")] +impl Framer +where + TRng: RngCore, +{ + pub async fn connect<'a, S>( + &mut self, + stream: &mut S, + buffer: &'a mut [u8], + websocket_options: &WebSocketOptions<'_>, + ) -> Result, FramerError<::Error>> + where + S: Read + Write + Unpin, + { + let (tx_len, web_socket_key) = self + .websocket + .client_connect(websocket_options, buffer) + .map_err(FramerError::WebSocket)?; + + let (tx_buf, _rx_buf) = buffer.split_at_mut(tx_len); + stream.write(tx_buf).await.map_err(FramerError::Io)?; + stream.flush().await.map_err(FramerError::Io)?; + + loop { + let read_len = stream.read(buffer).await.map_err(FramerError::Io)?; + + match self.websocket.client_accept(&web_socket_key, buffer) { + Ok((len, sub_protocol)) => { + // "consume" the HTTP header that we have read from the stream + // read_cursor would be 0 if we exactly read the HTTP header from the stream and nothing else + + // copy the remaining bytes to the end of the rx_buf (which is also the end of the buffer) because they are the contents of the next websocket frame(s) + let from = len; + let to = read_len; + let remaining_len = to - from; + + if remaining_len > 0 { + // let rx_start = read_len - remaining_len; + // rx_buf[rx_start..].copy_from_slice(&buf[from..to]); + self.rx_remainder_len = remaining_len; + } + + return Ok(sub_protocol); + } + Err(crate::Error::HttpHeaderIncomplete) => { + // TODO: continue reading HTTP header in loop + panic!("oh no"); + } + Err(e) => { + return Err(FramerError::WebSocket(e)); + } + } + } + } +} + +#[cfg(feature = "embedded-io-async")] +impl Framer +where + TRng: RngCore, + TWebSocketType: WebSocketType, +{ + pub fn new(websocket: WebSocket) -> Self { + Self { + websocket, + frame_cursor: 0, + rx_remainder_len: 0, + } + } + + pub fn encode( + &mut self, + message_type: WebSocketSendMessageType, + end_of_message: bool, + from: &[u8], + to: &mut [u8], + ) -> Result> { + let len = self + .websocket + .write(message_type, end_of_message, from, to) + .map_err(FramerError::WebSocket)?; + + Ok(len) + } + + pub async fn write<'b, T>( + &mut self, + tx: &mut T, + tx_buf: &'b mut [u8], + message_type: WebSocketSendMessageType, + end_of_message: bool, + frame_buf: &[u8], + ) -> Result<(), FramerError<::Error>> + where + T: Write + Unpin, + { + let len = self + .websocket + .write(message_type, end_of_message, frame_buf, tx_buf) + .map_err(FramerError::WebSocket)?; + + tx.write(&tx_buf[..len]) + .await + .map_err(FramerError::Io) + .unwrap(); + tx.flush().await.map_err(FramerError::Io).unwrap(); + Ok(()) + } + + pub async fn close<'b, T>( + &mut self, + tx: &mut T, + tx_buf: &'b mut [u8], + close_status: WebSocketCloseStatusCode, + status_description: Option<&str>, + ) -> Result<(), FramerError<::Error>> + where + T: Write + Unpin, + { + let len = self + .websocket + .close(close_status, status_description, tx_buf) + .map_err(FramerError::WebSocket)?; + + tx.write(&tx_buf[..len]) + .await + .map_err(FramerError::Io) + .unwrap(); + tx.flush().await.map_err(FramerError::Io).unwrap(); + Ok(()) + } + + // NOTE: any unused bytes read from the stream but not decoded are stored at the end + // of the buffer to be used next time this read function is called. This also applies to + // any unused bytes read when the connect handshake was made. Therefore it is important that + // the caller does not clear this buffer between calls or use it for anthing other than reads. + pub async fn read<'a, S>( + &mut self, + stream: &mut S, + buffer: &'a mut [u8], + ) -> Option, FramerError<::Error>>> + where + S: Read + Write + Unpin, + { + if self.rx_remainder_len == 0 { + match stream.read(buffer).await { + Ok(read_len) => { + if buffer.len() < read_len { + return Some(Err(FramerError::RxBufferTooSmall(read_len))); + } + + self.rx_remainder_len = read_len + } + Err(error) => { + return Some(Err(FramerError::Io(error))); + } + } + } + + let rx_start = buffer.len() - self.rx_remainder_len; + let (frame_buf, rx_buf) = buffer.split_at_mut(rx_start); + + let ws_result = match self.websocket.read(rx_buf, frame_buf) { + Ok(ws_result) => ws_result, + Err(e) => return Some(Err(FramerError::WebSocket(e))), + }; + + self.rx_remainder_len -= ws_result.len_from; + + match ws_result.message_type { + WebSocketReceiveMessageType::Binary => { + self.frame_cursor += ws_result.len_to; + if ws_result.end_of_message { + let range = 0..self.frame_cursor; + self.frame_cursor = 0; + return Some(Ok(ReadResult::Binary(&frame_buf[range]))); + } + } + WebSocketReceiveMessageType::Text => { + self.frame_cursor += ws_result.len_to; + if ws_result.end_of_message { + let range = 0..self.frame_cursor; + self.frame_cursor = 0; + match core::str::from_utf8(&frame_buf[range]) { + Ok(text) => return Some(Ok(ReadResult::Text(text))), + Err(e) => return Some(Err(FramerError::Utf8(e))), + } + } + } + WebSocketReceiveMessageType::CloseMustReply => { + let range = self.frame_cursor..self.frame_cursor + ws_result.len_to; + + // create a tx_buf from the end of the frame_buf + let tx_buf_len = ws_result.len_to + 14; // for extra websocket header + let split_at = frame_buf.len() - tx_buf_len; + let (frame_buf, tx_buf) = frame_buf.split_at_mut(split_at); + + match self.websocket.write( + WebSocketSendMessageType::CloseReply, + true, + &frame_buf[range.start..range.end], + tx_buf, + ) { + Ok(len) => match stream.write(&tx_buf[..len]).await { + Ok(_write_len) => { + self.frame_cursor = 0; + let status_code = ws_result + .close_status + .expect("close message must have code"); + let reason = &frame_buf[range]; + return Some(Ok(ReadResult::Close(CloseMessage { + status_code, + reason, + }))); + } + Err(e) => return Some(Err(FramerError::Io(e))), + }, + Err(e) => return Some(Err(FramerError::WebSocket(e))), + } + } + WebSocketReceiveMessageType::CloseCompleted => return None, + WebSocketReceiveMessageType::Pong => { + let range = self.frame_cursor..self.frame_cursor + ws_result.len_to; + return Some(Ok(ReadResult::Pong(&frame_buf[range]))); + } + WebSocketReceiveMessageType::Ping => { + let range = self.frame_cursor..self.frame_cursor + ws_result.len_to; + + // create a tx_buf from the end of the frame_buf + let tx_buf_len = ws_result.len_to + 14; // for extra websocket header + let split_at = frame_buf.len() - tx_buf_len; + let (frame_buf, tx_buf) = frame_buf.split_at_mut(split_at); + + match self.websocket.write( + WebSocketSendMessageType::Pong, + true, + &frame_buf[range.start..range.end], + tx_buf, + ) { + Ok(len) => match stream.write(&tx_buf[..len]).await { + Ok(_write_len) => { + return Some(Ok(ReadResult::Ping(&frame_buf[range]))); + } + Err(e) => return Some(Err(FramerError::Io(e))), + }, + Err(e) => return Some(Err(FramerError::WebSocket(e))), + } + } + } + + None + } +} From 1b52d60a0df14e44b36daea9c66bf37fec508b89 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 30 Aug 2023 10:45:47 +0200 Subject: [PATCH 2/9] add embedded-io-async support --- Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ed72b9a..f281171 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,27 +30,27 @@ log = "0.4.14" env_logger = "0.9.0" static_cell = { version = "1.1", features = ["nightly"]} -[dependencies.embassy-executor] +[dev-dependencies.embassy-executor] git = "https://github.com/embassy-rs/embassy.git" package = "embassy-executor" version = "0.3.0" # rev = "f8299d10f7c0387416989e00acc02d99661537fb" features = ["arch-std", "executor-thread", "log", "nightly", "integrated-timers"] -[dependencies.embassy-time] +[dev-dependencies.embassy-time] git = "https://github.com/embassy-rs/embassy.git" package = "embassy-time" # rev = "a2c718f61c0587e19d10c5c86ff4cd79272e2b87" features = ["log", "std", "nightly"] -[dependencies.embassy-net] +[dev-dependencies.embassy-net] git = "https://github.com/embassy-rs/embassy.git" package = "embassy-net" version = "0.1.0" # rev = "b5748524f86f809d9c8dc2c5b4bb3f07e55dbda1" features = ["std", "nightly", "log", "medium-ethernet", "medium-ip", "tcp", "udp", "dns", "dhcpv4", "proto-ipv6"] -[dependencies.embassy-net-driver] +[dev-dependencies.embassy-net-driver] git = "https://github.com/embassy-rs/embassy.git" package = "embassy-net-driver" # rev = "83ff3cbc69875f93c5a9bb36825c12df39f04f71" From ffe122c4f1df8341b19ac516fa290053f3397ebf Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 30 Oct 2023 10:36:58 +0700 Subject: [PATCH 3/9] Don't send empty Sec-WebSocket-Protocol header. --- src/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http.rs b/src/http.rs index de52008..408a3c5 100644 --- a/src/http.rs +++ b/src/http.rs @@ -149,8 +149,8 @@ pub fn build_connect_handshake_request( http_request.push_str(websocket_options.origin)?; // turn sub protocol list into a CSV list - http_request.push_str("\r\nSec-WebSocket-Protocol: ")?; if let Some(sub_protocols) = websocket_options.sub_protocols { + http_request.push_str("\r\nSec-WebSocket-Protocol: ")?; for (i, sub_protocol) in sub_protocols.iter().enumerate() { http_request.push_str(sub_protocol)?; if i < (sub_protocols.len() - 1) { From fce1c10805bbf90ecebfa78922266bd578936a1e Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 30 Oct 2023 10:49:57 +0700 Subject: [PATCH 4/9] Provide std::error::Error impl in std. --- Cargo.toml | 29 +++++++++++++++++++++++------ src/lib.rs | 21 +++++++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f281171..8ceca84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ httparse = { version = "1.8.0", default-features = false } rand_core = { version = "0.6.4", default-features = false } base64 = { version = "0.13.1", default-features = false } futures = { version = "0.3.28", default-features = false } -embedded-io-async = { version = "0.5.0", default-features = false, optional = true} +embedded-io-async = { version = "0.5.0", default-features = false, optional = true } [dev-dependencies] rand = "0.8.5" @@ -28,14 +28,20 @@ tokio-util = { version = "0.7.8", features = ["net", "codec"] } clap = { version = "3.0.0-beta.5", features = ["derive"] } log = "0.4.14" env_logger = "0.9.0" -static_cell = { version = "1.1", features = ["nightly"]} +static_cell = { version = "1.1", features = ["nightly"] } [dev-dependencies.embassy-executor] git = "https://github.com/embassy-rs/embassy.git" package = "embassy-executor" version = "0.3.0" # rev = "f8299d10f7c0387416989e00acc02d99661537fb" -features = ["arch-std", "executor-thread", "log", "nightly", "integrated-timers"] +features = [ + "arch-std", + "executor-thread", + "log", + "nightly", + "integrated-timers", +] [dev-dependencies.embassy-time] git = "https://github.com/embassy-rs/embassy.git" @@ -48,7 +54,18 @@ git = "https://github.com/embassy-rs/embassy.git" package = "embassy-net" version = "0.1.0" # rev = "b5748524f86f809d9c8dc2c5b4bb3f07e55dbda1" -features = ["std", "nightly", "log", "medium-ethernet", "medium-ip", "tcp", "udp", "dns", "dhcpv4", "proto-ipv6"] +features = [ + "std", + "nightly", + "log", + "medium-ethernet", + "medium-ip", + "tcp", + "udp", + "dns", + "dhcpv4", + "proto-ipv6", +] [dev-dependencies.embassy-net-driver] git = "https://github.com/embassy-rs/embassy.git" @@ -64,8 +81,8 @@ package = "embassy-net-tuntap" # see readme for no_std support [features] default = ["std"] -# default = [] -std = [] + +std = ["httparse/std"] embedded-io-async = ["dep:embedded-io-async"] [[example]] diff --git a/src/lib.rs b/src/lib.rs index 432664f..1120530 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -236,6 +236,27 @@ impl From<()> for Error { } } +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Error::HttpHeader(error) => write!(f, "bad http header {error}"), + Error::HttpResponseCodeInvalid(Some(code)) => write!(f, "bad http response ({code})"), + _ => write!(f, "{:?}", self), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + if let Self::HttpHeader(error) = self { + Some(error) + } else { + None + } + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum WebSocketOpCode { ContinuationFrame = 0, From eb79c990d58514857a7148808d3915aefd512505 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 14 Aug 2024 13:12:47 +0000 Subject: [PATCH 5/9] add example; improve feature handling; update embedded-io-async --- .devcontainer/devcontainer.json | 31 +++++ .github/dependabot.yml | 12 ++ Cargo.toml | 4 +- examples/client_async_embedded_io_async.rs | 138 ++++----------------- src/framer_async.rs | 6 +- 5 files changed, 76 insertions(+), 115 deletions(-) create mode 100644 .devcontainer/devcontainer.json create mode 100644 .github/dependabot.yml diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..f8fff10 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,31 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/rust +{ + "name": "Rust", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye" + + // Use 'mounts' to make the cargo cache persistent in a Docker Volume. + // "mounts": [ + // { + // "source": "devcontainer-cargo-cache-${devcontainerId}", + // "target": "/usr/local/cargo", + // "type": "volume" + // } + // ] + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "rustc --version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..f33a02c --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,12 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for more information: +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates +# https://containers.dev/guide/dependabot + +version: 2 +updates: + - package-ecosystem: "devcontainers" + directory: "/" + schedule: + interval: weekly diff --git a/Cargo.toml b/Cargo.toml index 8ceca84..3d07ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ httparse = { version = "1.8.0", default-features = false } rand_core = { version = "0.6.4", default-features = false } base64 = { version = "0.13.1", default-features = false } futures = { version = "0.3.28", default-features = false } -embedded-io-async = { version = "0.5.0", default-features = false, optional = true } +embedded-io-async = { version = "0.6.1", default-features = false, optional = true } [dev-dependencies] rand = "0.8.5" @@ -77,6 +77,8 @@ git = "https://github.com/embassy-rs/embassy.git" version = "0.1.0" package = "embassy-net-tuntap" # rev = "4d60c715e683aaadf25d9f066bde805c725fefb4" +static_cell = { version = "1.1", features = ["nightly"] } +embedded-io-adapters = { version = "0.6.1", features = ["tokio-1"] } # see readme for no_std support [features] diff --git a/examples/client_async_embedded_io_async.rs b/examples/client_async_embedded_io_async.rs index bc94378..aac102c 100644 --- a/examples/client_async_embedded_io_async.rs +++ b/examples/client_async_embedded_io_async.rs @@ -1,89 +1,20 @@ -#![feature(type_alias_impl_trait)] +use embedded_io_adapters::tokio_1::FromTokio; +use std::error::Error; -use std::default::Default; +use tokio::net::TcpStream; -use clap::Parser; -use embassy_executor::{Executor, Spawner}; -use embassy_net::tcp::TcpSocket; -use embassy_net::{Config, Ipv4Address, Ipv4Cidr, Stack, StackResources}; -use embassy_net_tuntap::TunTapDevice; -use embassy_time::Duration; -use embedded_websocket::framer_async::{Framer, ReadResult}; use embedded_websocket::{ + framer_async::{Framer, FramerError, ReadResult}, WebSocketClient, WebSocketCloseStatusCode, WebSocketOptions, WebSocketSendMessageType, }; -use heapless::Vec; -use log::*; -use rand_core::{OsRng, RngCore}; -use static_cell::{make_static, StaticCell}; - -#[derive(Parser)] -#[clap(version = "1.0")] -struct Opts { - /// TAP device name - #[clap(long, default_value = "tap0")] - tap: String, - /// use a static IP instead of DHCP - #[clap(long)] - static_ip: bool, -} - -#[embassy_executor::task] -async fn net_task(stack: &'static Stack) -> ! { - stack.run().await -} - -#[embassy_executor::task] -async fn main_task(spawner: Spawner) { - let opts: Opts = Opts::parse(); - - // Init network device - let device = TunTapDevice::new(&opts.tap).unwrap(); - - // Choose between dhcp or static ip - let config = if opts.static_ip { - Config::ipv4_static(embassy_net::StaticConfigV4 { - address: Ipv4Cidr::new(Ipv4Address::new(172, 0, 0, 1), 24), - dns_servers: Vec::new(), - gateway: Some(Ipv4Address::new(192, 168, 69, 1)), - }) - } else { - Config::dhcpv4(Default::default()) - }; - - // Generate random seed - let mut seed = [0; 8]; - OsRng.fill_bytes(&mut seed); - let seed = u64::from_le_bytes(seed); - - // Init network stack - let stack = &*make_static!(Stack::new( - device, - config, - make_static!(StackResources::<3>::new()), - seed - )); - - // Launch network task - spawner.spawn(net_task(stack)).unwrap(); - - // Then we can use it! - let mut rx_buffer = [0; 4096]; - let mut tx_buffer = [0; 4096]; - let mut ws_buffer = [0u8; 4000]; - let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); - - socket.set_timeout(Some(Duration::from_secs(10))); - - let remote_endpoint = (Ipv4Address::new(172, 0, 0, 1), 1337); - info!("connecting to {:?}...", remote_endpoint); - let r = socket.connect(remote_endpoint).await; - if let Err(e) = r { - warn!("connect error: {:?}", e); - return; - } - info!("connected!"); +#[tokio::main] +async fn main() -> Result<(), FramerError> { + // Connect to a peer + let address = "127.0.0.1:1337"; + let mut buffer = [0u8; 4000]; + let tcp_stream = TcpStream::connect(address).await.map_err(FramerError::Io)?; + let mut stream = FromTokio::new(tcp_stream); let websocket = WebSocketClient::new_client(rand::thread_rng()); // initiate a websocket opening handshake @@ -98,60 +29,43 @@ async fn main_task(spawner: Spawner) { let mut framer = Framer::new(websocket); framer - .connect(&mut socket, &mut ws_buffer, &websocket_options) - .await - .unwrap(); + .connect(&mut stream, &mut buffer, &websocket_options) + .await?; - info!("ws handshake complete"); + println!("ws handshake complete"); framer .write( - &mut socket, - &mut ws_buffer, + &mut stream, + &mut buffer, WebSocketSendMessageType::Text, true, "Hello, world".as_bytes(), ) - .await - .unwrap(); + .await?; - info!("sent message"); + println!("sent message"); - while let Some(read_result) = framer.read(&mut socket, &mut ws_buffer).await { - let read_result = read_result.unwrap(); + while let Some(read_result) = framer.read(&mut stream, &mut buffer).await { + let read_result = read_result?; match read_result { ReadResult::Text(text) => { - info!("received text: {text}"); + println!("received text: {text}"); framer .close( - &mut socket, - &mut ws_buffer, + &mut stream, + &mut buffer, WebSocketCloseStatusCode::NormalClosure, None, ) - .await - .unwrap() + .await? } _ => { // ignore other kinds of messages } } } - info!("closed"); -} - -static EXECUTOR: StaticCell = StaticCell::new(); - -fn main() { - env_logger::builder() - .filter_level(log::LevelFilter::Debug) - .filter_module("async_io", log::LevelFilter::Info) - .format_timestamp_nanos() - .init(); - - let executor = EXECUTOR.init(Executor::new()); - executor.run(|spawner| { - spawner.spawn(main_task(spawner)).unwrap(); - }); + println!("closed"); + Ok(()) } diff --git a/src/framer_async.rs b/src/framer_async.rs index 498ec1f..5bfdd10 100644 --- a/src/framer_async.rs +++ b/src/framer_async.rs @@ -1,7 +1,9 @@ -use core::{fmt::Debug, ops::Deref, str::Utf8Error}; - +#[cfg(not(feature = "embedded-io-async"))] +use core::ops::Deref; +use core::{fmt::Debug, str::Utf8Error}; #[cfg(feature = "embedded-io-async")] use embedded_io_async::{ErrorType, Read, Write}; +#[cfg(not(feature = "embedded-io-async"))] use futures::{Sink, SinkExt, Stream, StreamExt}; use rand_core::RngCore; From 0a2a500818382ff5877b6d1cbbea67d327f13943 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 14 Aug 2024 13:21:47 +0000 Subject: [PATCH 6/9] clean up Cargo.toml --- Cargo.toml | 50 -------------------------------------------------- 1 file changed, 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d07ff9..b798613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,56 +28,6 @@ tokio-util = { version = "0.7.8", features = ["net", "codec"] } clap = { version = "3.0.0-beta.5", features = ["derive"] } log = "0.4.14" env_logger = "0.9.0" -static_cell = { version = "1.1", features = ["nightly"] } - -[dev-dependencies.embassy-executor] -git = "https://github.com/embassy-rs/embassy.git" -package = "embassy-executor" -version = "0.3.0" -# rev = "f8299d10f7c0387416989e00acc02d99661537fb" -features = [ - "arch-std", - "executor-thread", - "log", - "nightly", - "integrated-timers", -] - -[dev-dependencies.embassy-time] -git = "https://github.com/embassy-rs/embassy.git" -package = "embassy-time" -# rev = "a2c718f61c0587e19d10c5c86ff4cd79272e2b87" -features = ["log", "std", "nightly"] - -[dev-dependencies.embassy-net] -git = "https://github.com/embassy-rs/embassy.git" -package = "embassy-net" -version = "0.1.0" -# rev = "b5748524f86f809d9c8dc2c5b4bb3f07e55dbda1" -features = [ - "std", - "nightly", - "log", - "medium-ethernet", - "medium-ip", - "tcp", - "udp", - "dns", - "dhcpv4", - "proto-ipv6", -] - -[dev-dependencies.embassy-net-driver] -git = "https://github.com/embassy-rs/embassy.git" -package = "embassy-net-driver" -# rev = "83ff3cbc69875f93c5a9bb36825c12df39f04f71" - -[dev-dependencies.embassy-net-tuntap] -git = "https://github.com/embassy-rs/embassy.git" -version = "0.1.0" -package = "embassy-net-tuntap" -# rev = "4d60c715e683aaadf25d9f066bde805c725fefb4" -static_cell = { version = "1.1", features = ["nightly"] } embedded-io-adapters = { version = "0.6.1", features = ["tokio-1"] } # see readme for no_std support From 18aafad9e3c9d46160731ef5d6d83209e27a57a1 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 14 Aug 2024 13:22:26 +0000 Subject: [PATCH 7/9] clean up Cargo.toml --- Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b798613..0367c0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,6 @@ rand = "0.8.5" bytes = "1.4.0" tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.8", features = ["net", "codec"] } -clap = { version = "3.0.0-beta.5", features = ["derive"] } -log = "0.4.14" -env_logger = "0.9.0" embedded-io-adapters = { version = "0.6.1", features = ["tokio-1"] } # see readme for no_std support From 12385769aac3e3cd3b82535515a3d5ae9d06b5a9 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 14 Aug 2024 13:23:21 +0000 Subject: [PATCH 8/9] clean up Cargo.toml --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0367c0d..f808e53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ embedded-io-adapters = { version = "0.6.1", features = ["tokio-1"] } # see readme for no_std support [features] default = ["std"] - std = ["httparse/std"] embedded-io-async = ["dep:embedded-io-async"] From 97e43d73b4946cda458bdd65d52cc6f79c431748 Mon Sep 17 00:00:00 2001 From: LimpidCrypto Date: Wed, 14 Aug 2024 17:37:47 +0000 Subject: [PATCH 9/9] fix reading from stream --- src/framer_async.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/framer_async.rs b/src/framer_async.rs index 5bfdd10..97cd0f6 100644 --- a/src/framer_async.rs +++ b/src/framer_async.rs @@ -472,9 +472,7 @@ where } } - let rx_start = buffer.len() - self.rx_remainder_len; - let (frame_buf, rx_buf) = buffer.split_at_mut(rx_start); - + let (rx_buf, frame_buf) = buffer.split_at_mut(self.rx_remainder_len); let ws_result = match self.websocket.read(rx_buf, frame_buf) { Ok(ws_result) => ws_result, Err(e) => return Some(Err(FramerError::WebSocket(e))),