Skip to content

Commit 5c149e3

Browse files
committed
Support wasm transport.
1 parent 40af9fc commit 5c149e3

File tree

26 files changed

+321
-196
lines changed

26 files changed

+321
-196
lines changed

examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ rsocket_rust_transport_tcp = { path = "../rsocket-transport-tcp" }
1010
rsocket_rust_transport_websocket = { path = "../rsocket-transport-websocket" }
1111
log = "0.4.8"
1212
env_logger = "0.7.1"
13-
futures = "0.3.3"
13+
futures = "0.3.4"
1414

1515
[dev-dependencies.tokio]
1616
version = "0.2.11"

rsocket-test/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ edition = "2018"
66

77
[dev-dependencies]
88
log = "0.4"
9-
futures = "0.3.3"
9+
futures = "0.3.4"
1010
env_logger = "0.7.1"
1111
rsocket_rust = { path = "../rsocket", features = ["frame"] }
1212
rsocket_rust_transport_tcp = { path = "../rsocket-transport-tcp" }
1313
rsocket_rust_transport_websocket = { path = "../rsocket-transport-websocket" }
1414
bytes = "0.5.4"
15-
hex = "0.4.0"
15+
hex = "0.4.2"
1616
rand = "0.7.3"
1717

1818
[dev-dependencies.tokio]

rsocket-test/tests/test_clients.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ fn init() {
1616
.try_init();
1717
}
1818

19+
#[tokio::main]
20+
#[test]
21+
async fn test_connect_must_failed() {
22+
let result = RSocketFactory::connect()
23+
.transport(TcpClientTransport::from("tcp://127.0.0.1:6789"))
24+
.start()
25+
.await;
26+
assert_eq!(false, result.is_ok());
27+
}
28+
1929
#[test]
2030
fn test_websocket() {
2131
init();

rsocket-test/tests/test_composite_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use bytes::{Buf, BufMut, Bytes, BytesMut};
1+
use bytes::BytesMut;
22
use rsocket_rust::extension::{CompositeMetadata, Metadata};
33
use rsocket_rust::utils::Writeable;
44

rsocket-transport-tcp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2018"
66

77
[dependencies]
88
log = "0.4.8"
9-
futures = "0.3.3"
9+
futures = "0.3.4"
1010
bytes = "0.5.4"
1111
rsocket_rust = { path = "../rsocket", features = ["frame"] }
1212

rsocket-transport-tcp/src/client.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use super::codec::LengthBasedFrameCodec;
22
use futures::{SinkExt, StreamExt};
3+
use rsocket_rust::error::RSocketError;
34
use rsocket_rust::frame::Frame;
4-
use rsocket_rust::transport::{BoxResult, ClientTransport, Rx, SafeFuture, Tx};
5+
use rsocket_rust::runtime::{DefaultSpawner, Spawner};
6+
use rsocket_rust::transport::{ClientTransport, Rx, Tx, TxOnce};
7+
use std::future::Future;
58
use std::net::SocketAddr;
69
use std::net::TcpStream as StdTcpStream;
10+
use std::pin::Pin;
711
use tokio::net::TcpStream;
812
use tokio_util::codec::Framed;
913

@@ -23,37 +27,53 @@ impl TcpClientTransport {
2327
}
2428

2529
#[inline]
26-
async fn connect(self) -> BoxResult<TcpStream> {
30+
async fn connect(self) -> Result<TcpStream, RSocketError> {
2731
match self.connector {
2832
Connector::Direct(stream) => Ok(stream),
2933
Connector::Lazy(addr) => match StdTcpStream::connect(&addr) {
3034
Ok(raw) => match TcpStream::from_std(raw) {
3135
Ok(stream) => Ok(stream),
32-
Err(e) => Err(Box::new(e)),
36+
Err(e) => Err(RSocketError::from(e)),
3337
},
34-
Err(e) => Err(Box::new(e)),
38+
Err(e) => Err(RSocketError::from(e)),
3539
},
3640
}
3741
}
3842
}
3943

4044
impl ClientTransport for TcpClientTransport {
41-
fn attach(self, incoming: Tx<Frame>, mut sending: Rx<Frame>) -> SafeFuture<BoxResult<()>> {
42-
Box::pin(async move {
43-
let socket = self.connect().await?;
44-
let (mut writer, mut reader) = Framed::new(socket, LengthBasedFrameCodec).split();
45-
tokio::spawn(async move {
46-
while let Some(it) = reader.next().await {
47-
incoming.unbounded_send(it.unwrap()).unwrap();
45+
fn attach(
46+
self,
47+
incoming: Tx<Frame>,
48+
mut sending: Rx<Frame>,
49+
connected: Option<TxOnce<Result<(), RSocketError>>>,
50+
) {
51+
DefaultSpawner.spawn(async move {
52+
match self.connect().await {
53+
Ok(socket) => {
54+
if let Some(sender) = connected {
55+
sender.send(Ok(())).unwrap();
56+
}
57+
let (mut writer, mut reader) =
58+
Framed::new(socket, LengthBasedFrameCodec).split();
59+
DefaultSpawner.spawn(async move {
60+
while let Some(it) = reader.next().await {
61+
incoming.unbounded_send(it.unwrap()).unwrap();
62+
}
63+
});
64+
// loop write
65+
while let Some(it) = sending.next().await {
66+
debug!("===> SND: {:?}", &it);
67+
writer.send(it).await.unwrap()
68+
}
69+
}
70+
Err(e) => {
71+
if let Some(sender) = connected {
72+
sender.send(Err(e)).unwrap();
73+
}
4874
}
49-
});
50-
// loop write
51-
while let Some(it) = sending.next().await {
52-
debug!("===> SND: {:?}", &it);
53-
writer.send(it).await.unwrap()
5475
}
55-
Ok(())
56-
})
76+
});
5777
}
5878
}
5979

@@ -65,7 +85,12 @@ impl From<SocketAddr> for TcpClientTransport {
6585

6686
impl From<&str> for TcpClientTransport {
6787
fn from(addr: &str) -> TcpClientTransport {
68-
let socket_addr: SocketAddr = addr.parse().unwrap();
88+
let socket_addr: SocketAddr = if addr.starts_with("tcp://") {
89+
let ss: String = addr.chars().skip(6).collect();
90+
ss.parse().unwrap()
91+
} else {
92+
addr.parse().unwrap()
93+
};
6994
TcpClientTransport::new(Connector::Lazy(socket_addr))
7095
}
7196
}

rsocket-transport-tcp/src/server.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use super::client::TcpClientTransport;
2-
use rsocket_rust::transport::{BoxResult, ClientTransport, SafeFuture, ServerTransport};
2+
use rsocket_rust::transport::{ClientTransport, ServerTransport};
3+
use std::error::Error;
4+
use std::future::Future;
35
use std::net::SocketAddr;
6+
use std::pin::Pin;
47
use tokio::net::TcpListener;
58

69
pub struct TcpServerTransport {
@@ -20,7 +23,7 @@ impl ServerTransport for TcpServerTransport {
2023
self,
2124
starter: Option<fn()>,
2225
acceptor: impl Fn(Self::Item) + Send + Sync + 'static,
23-
) -> SafeFuture<BoxResult<()>>
26+
) -> Pin<Box<dyn Send + Future<Output = Result<(), Box<dyn Send + Sync + Error>>>>>
2427
where
2528
Self::Item: ClientTransport + Sized,
2629
{

rsocket-transport-wasm/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@ edition = "2018"
66

77
[dependencies]
88
bytes = "0.5.4"
9-
wasm-bindgen = "0.2.58"
109
wasm-bindgen-futures = "0.4.8"
10+
futures-channel = "0.3.4"
11+
futures-util = "0.3.4"
1112
js-sys = "0.3.35"
13+
serde = "1.0.104"
14+
serde_derive = "1.0.104"
1215
rsocket_rust = { path = "../rsocket", features = ["frame"] }
1316

17+
[dependencies.wasm-bindgen]
18+
version = "0.2.58"
19+
features = ["serde-serialize"]
20+
1421
[dependencies.web-sys]
1522
version = "0.3.35"
1623
features = [
@@ -20,4 +27,5 @@ features = [
2027
"ErrorEvent",
2128
"MessageEvent",
2229
"WebSocket",
30+
"Event",
2331
]

rsocket-transport-wasm/src/client.rs

Lines changed: 83 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
use bytes::BytesMut;
2+
use futures_channel::{mpsc, oneshot};
3+
use futures_util::{SinkExt, StreamExt, TryStreamExt};
24
use js_sys::{ArrayBuffer, Uint8Array};
5+
use rsocket_rust::error::RSocketError;
36
use rsocket_rust::frame::Frame;
4-
use rsocket_rust::transport::{BoxResult, ClientTransport, Rx, SafeFuture, Tx};
7+
use rsocket_rust::transport::{ClientTransport, Rx, Tx, TxOnce};
58
use rsocket_rust::utils::Writeable;
69
use std::cell::RefCell;
10+
use std::future::Future;
11+
use std::pin::Pin;
712
use std::rc::Rc;
813
use wasm_bindgen::prelude::*;
914
use wasm_bindgen::JsCast;
10-
use web_sys::{ErrorEvent, FileReader, MessageEvent, ProgressEvent, WebSocket};
15+
use wasm_bindgen_futures::spawn_local;
16+
use web_sys::{ErrorEvent, Event, FileReader, MessageEvent, ProgressEvent, WebSocket};
1117

1218
macro_rules! console_log {
1319
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
@@ -23,46 +29,83 @@ pub struct WebsocketClientTransport {
2329
url: String,
2430
}
2531

32+
impl WebsocketClientTransport {
33+
#[inline]
34+
fn wait_for_open(ws: &WebSocket) -> impl Future<Output = ()> {
35+
let (sender, receiver) = oneshot::channel();
36+
// The Closure is only called once, so we can use Closure::once
37+
let on_open = Closure::once(move |_e: Event| {
38+
// We don't need to send a value, so we just send ()
39+
sender.send(()).unwrap();
40+
});
41+
ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
42+
async move {
43+
// Wait for it to open
44+
receiver.await.unwrap();
45+
// Clean up the Closure so we don't leak any memory
46+
drop(on_open);
47+
}
48+
}
49+
}
50+
2651
impl ClientTransport for WebsocketClientTransport {
27-
fn attach(self, incoming: Tx<Frame>, mut sending: Rx<Frame>) -> SafeFuture<BoxResult<()>> {
28-
Box::pin(async move {
29-
let sending = RefCell::new(sending);
30-
// Connect to an echo server
31-
let ws = WebSocket::new(&self.url).unwrap();
32-
33-
// on message
34-
let on_message = Closure::wrap(Box::new(move |e: MessageEvent| {
35-
let data: JsValue = e.data();
36-
read_binary(data, incoming.clone());
37-
}) as Box<dyn FnMut(MessageEvent)>);
38-
ws.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
39-
on_message.forget();
40-
41-
// on error
42-
let on_error = Closure::wrap(Box::new(move |_e: ErrorEvent| {
43-
// TODO: handle error
44-
}) as Box<dyn FnMut(ErrorEvent)>);
45-
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
46-
on_error.forget();
47-
48-
// on open
49-
let cloned_ws = ws.clone();
50-
let on_open = Closure::wrap(Box::new(move |_| {
51-
let mut sending = sending.borrow_mut();
52-
while let Ok(Some(f)) = sending.try_next() {
53-
let mut bf = BytesMut::new();
54-
f.write_to(&mut bf);
55-
let mut raw = bf.to_vec();
56-
cloned_ws.send_with_u8_array(&mut raw[..]).unwrap();
52+
fn attach(
53+
self,
54+
incoming: Tx<Frame>,
55+
mut sending: Rx<Frame>,
56+
connected: Option<TxOnce<Result<(), RSocketError>>>,
57+
) {
58+
spawn_local(async move {
59+
match WebSocket::new(&self.url) {
60+
Ok(ws) => {
61+
// on message
62+
let on_message = Closure::wrap(Box::new(move |e: MessageEvent| {
63+
let data: JsValue = e.data();
64+
read_binary(data, incoming.clone());
65+
})
66+
as Box<dyn FnMut(MessageEvent)>);
67+
ws.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
68+
on_message.forget();
69+
70+
// on error
71+
let on_error = Closure::wrap(Box::new(move |e: ErrorEvent| {
72+
console_log!("websocket error: {}", e.message());
73+
})
74+
as Box<dyn FnMut(ErrorEvent)>);
75+
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
76+
on_error.forget();
77+
78+
// on_close
79+
let on_close = Closure::once(Box::new(move |_e: Event| {
80+
console_log!("websocket closed");
81+
}) as Box<dyn FnMut(Event)>);
82+
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
83+
on_close.forget();
84+
85+
Self::wait_for_open(&ws).await;
86+
87+
if let Some(sender) = connected {
88+
sender.send(Ok(())).unwrap();
89+
}
90+
91+
while let Some(v) = sending.next().await {
92+
let mut bf = BytesMut::new();
93+
v.write_to(&mut bf);
94+
let mut raw = bf.to_vec();
95+
ws.send_with_u8_array(&mut raw[..])
96+
.expect("write data into websocket failed.");
97+
}
98+
console_log!("***** attch end *****");
5799
}
58-
}) as Box<dyn FnMut(JsValue)>);
59-
ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
60-
on_open.forget();
61-
62-
console_log!("***** attch end *****");
63-
64-
Ok(())
65-
})
100+
Err(e) => {
101+
if let Some(sender) = connected {
102+
sender
103+
.send(Err(RSocketError::from(e.as_string().unwrap())))
104+
.unwrap();
105+
}
106+
}
107+
}
108+
});
66109
}
67110
}
68111

rsocket-transport-wasm/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
#[macro_use]
2+
extern crate serde_derive;
3+
14
mod client;
25
mod misc;
36
mod runtime;
47

58
pub use client::WebsocketClientTransport;
6-
pub use misc::{JsClient, JsPayload};
9+
pub use misc::{connect, new_payload, JsClient, JsPayload};
710
pub use runtime::WASMSpawner;

0 commit comments

Comments
 (0)