Skip to content

Commit 1322bd4

Browse files
committed
Transport refinement.
1 parent e14df99 commit 1322bd4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1579
-1546
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ members = [
55
"rsocket",
66
# transports
77
"rsocket-transport-tcp",
8-
"rsocket-transport-unix",
98
"rsocket-transport-websocket",
109
"rsocket-transport-wasm",
1110
# extra
1211
"rsocket-messaging",
1312
# internal
1413
"examples",
1514
"rsocket-test",
16-
"rsocket-benchmark",
15+
# "rsocket-benchmark",
1716
]

examples/Cargo.toml

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,24 @@ edition = "2018"
66
publish = false
77

88
[dev-dependencies]
9-
log = "0.4.8"
9+
log = "0.4.11"
1010
env_logger = "0.7.1"
1111
futures = "0.3.5"
12-
clap = "2.33.1"
13-
tokio-postgres = "0.5.4"
14-
redis = "0.16.0"
12+
clap = "2.33.3"
13+
tokio-postgres = "0.5.5"
14+
redis = "0.17.0"
1515

1616
[dev-dependencies.rsocket_rust]
17-
version = "0.5.3"
17+
path = "../rsocket"
1818

1919
[dev-dependencies.rsocket_rust_transport_tcp]
20-
version = "0.5.3"
21-
22-
[dev-dependencies.rsocket_rust_transport_unix]
23-
version = "0.5.3"
20+
path = "../rsocket-transport-tcp"
2421

2522
[dev-dependencies.rsocket_rust_transport_websocket]
26-
version = "0.5.3"
23+
path = "../rsocket-transport-websocket"
2724

2825
[dev-dependencies.tokio]
29-
version = "0.2.21"
26+
version = "0.2.22"
3027
default-features = false
3128
features = ["full"]
3229

@@ -53,11 +50,3 @@ path = "postgres.rs"
5350
[[example]]
5451
name = "redis"
5552
path = "redis.rs"
56-
57-
[[example]]
58-
name = "echo_uds"
59-
path = "echo_uds.rs"
60-
61-
[[example]]
62-
name = "echo_uds_client"
63-
path = "echo_uds_client.rs"

examples/echo.rs

Lines changed: 102 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,92 @@ extern crate log;
33

44
use clap::{App, Arg, SubCommand};
55
use rsocket_rust::prelude::*;
6-
use rsocket_rust_transport_tcp::{TcpClientTransport, TcpServerTransport};
7-
use std::error::Error;
6+
use rsocket_rust::transport::Connection;
7+
use rsocket_rust_transport_tcp::{
8+
TcpClientTransport, TcpServerTransport, UnixClientTransport, UnixServerTransport,
9+
};
10+
use rsocket_rust_transport_websocket::{WebsocketClientTransport, WebsocketServerTransport};
811
use std::fs;
912

13+
type Result<T> = rsocket_rust::Result<T>;
14+
1015
enum RequestMode {
1116
FNF,
1217
REQUEST,
1318
STREAM,
1419
CHANNEL,
1520
}
1621

22+
async fn serve<A, B>(transport: A, mtu: usize) -> Result<()>
23+
where
24+
A: Send + Sync + ServerTransport<Item = B> + 'static,
25+
B: Send + Sync + Transport + 'static,
26+
{
27+
RSocketFactory::receive()
28+
.transport(transport)
29+
.fragment(mtu)
30+
.acceptor(Box::new(|setup, _socket| {
31+
info!("accept setup: {:?}", setup);
32+
Ok(Box::new(EchoRSocket))
33+
// Or you can reject setup
34+
// Err(From::from("SETUP_NOT_ALLOW"))
35+
}))
36+
.on_start(Box::new(|| info!("+++++++ echo server started! +++++++")))
37+
.serve()
38+
.await
39+
}
40+
41+
async fn connect<A, B>(transport: A, mtu: usize, req: Payload, mode: RequestMode) -> Result<()>
42+
where
43+
A: Send + Sync + Transport<Conn = B> + 'static,
44+
B: Send + Sync + Connection + 'static,
45+
{
46+
let cli = RSocketFactory::connect()
47+
.fragment(mtu)
48+
.transport(transport)
49+
.start()
50+
.await?;
51+
52+
match mode {
53+
RequestMode::FNF => {
54+
cli.fire_and_forget(req).await;
55+
}
56+
RequestMode::STREAM => {
57+
let mut results = cli.request_stream(req);
58+
loop {
59+
match results.next().await {
60+
Some(Ok(v)) => info!("{:?}", v),
61+
Some(Err(e)) => {
62+
error!("STREAM_RESPONSE FAILED: {:?}", e);
63+
break;
64+
}
65+
None => break,
66+
}
67+
}
68+
}
69+
RequestMode::CHANNEL => {
70+
let mut results = cli.request_channel(Box::pin(futures::stream::iter(vec![Ok(req)])));
71+
loop {
72+
match results.next().await {
73+
Some(Ok(v)) => info!("{:?}", v),
74+
Some(Err(e)) => {
75+
error!("CHANNEL_RESPONSE FAILED: {:?}", e);
76+
break;
77+
}
78+
None => break,
79+
}
80+
}
81+
}
82+
RequestMode::REQUEST => {
83+
let res = cli.request_response(req).await.expect("Request failed!");
84+
info!("{:?}", res);
85+
}
86+
}
87+
Ok(())
88+
}
89+
1790
#[tokio::main]
18-
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
91+
async fn main() -> Result<()> {
1992
env_logger::builder().format_timestamp_millis().init();
2093

2194
let cli = App::new("echo")
@@ -106,18 +179,25 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
106179
.value_of("mtu")
107180
.map(|it| it.parse().expect("Invalid mtu string!"))
108181
.unwrap_or(0);
109-
RSocketFactory::receive()
110-
.transport(TcpServerTransport::from(addr))
111-
.fragment(mtu)
112-
.acceptor(Box::new(|setup, _socket| {
113-
info!("accept setup: {:?}", setup);
114-
Ok(Box::new(EchoRSocket))
115-
// Or you can reject setup
116-
// Err(From::from("SETUP_NOT_ALLOW"))
117-
}))
118-
.on_start(Box::new(|| info!("+++++++ echo server started! +++++++")))
119-
.serve()
120-
.await
182+
183+
if addr.starts_with("ws://") {
184+
serve(WebsocketServerTransport::from(addr), mtu).await
185+
} else if addr.starts_with("unix://") {
186+
let addr_owned = addr.to_owned();
187+
tokio::spawn(async move {
188+
let _ = serve(UnixServerTransport::from(addr_owned), mtu).await;
189+
});
190+
let sockfile = addr.chars().skip(7).collect::<String>();
191+
// Watch signal
192+
tokio::signal::ctrl_c().await?;
193+
info!("ctrl-c received!");
194+
if let Err(e) = std::fs::remove_file(&sockfile) {
195+
error!("remove unix sock file failed: {}", e);
196+
}
197+
Ok(())
198+
} else {
199+
serve(TcpServerTransport::from(addr), mtu).await
200+
}
121201
}
122202
("connect", Some(flags)) => {
123203
let mut modes: Vec<RequestMode> = vec![];
@@ -147,12 +227,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
147227
.unwrap_or(0);
148228

149229
let addr = flags.value_of("URL").expect("Missing URL");
150-
let cli = RSocketFactory::connect()
151-
.fragment(mtu)
152-
.transport(TcpClientTransport::from(addr))
153-
.start()
154-
.await
155-
.expect("Connect failed!");
156230
let mut bu = Payload::builder();
157231
if let Some(data) = flags.value_of("input") {
158232
if data.starts_with("@") {
@@ -164,45 +238,14 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
164238
}
165239
}
166240
let req = bu.build();
167-
168-
match modes.pop().unwrap_or(RequestMode::REQUEST) {
169-
RequestMode::FNF => {
170-
cli.fire_and_forget(req).await;
171-
}
172-
RequestMode::STREAM => {
173-
let mut results = cli.request_stream(req);
174-
loop {
175-
match results.next().await {
176-
Some(Ok(v)) => info!("{:?}", v),
177-
Some(Err(e)) => {
178-
error!("STREAM_RESPONSE FAILED: {:?}", e);
179-
break;
180-
}
181-
None => break,
182-
}
183-
}
184-
}
185-
RequestMode::CHANNEL => {
186-
let mut results =
187-
cli.request_channel(Box::pin(futures::stream::iter(vec![Ok(req)])));
188-
loop {
189-
match results.next().await {
190-
Some(Ok(v)) => info!("{:?}", v),
191-
Some(Err(e)) => {
192-
error!("CHANNEL_RESPONSE FAILED: {:?}", e);
193-
break;
194-
}
195-
None => break,
196-
}
197-
}
198-
}
199-
RequestMode::REQUEST => {
200-
let res = cli.request_response(req).await.expect("Request failed!");
201-
info!("{:?}", res);
202-
}
241+
let mode = modes.pop().unwrap_or(RequestMode::REQUEST);
242+
if addr.starts_with("ws://") {
243+
connect(WebsocketClientTransport::from(addr), mtu, req, mode).await
244+
} else if addr.starts_with("unix://") {
245+
connect(UnixClientTransport::from(addr), mtu, req, mode).await
246+
} else {
247+
connect(TcpClientTransport::from(addr), mtu, req, mode).await
203248
}
204-
205-
Ok(())
206249
}
207250
_ => Ok(()),
208251
}

examples/echo_uds.rs

Lines changed: 0 additions & 28 deletions
This file was deleted.

examples/echo_uds_client.rs

Lines changed: 0 additions & 31 deletions
This file was deleted.

examples/postgres.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
#[macro_use]
22
extern crate log;
33

4-
use rsocket_rust::{error::RSocketError, prelude::*};
4+
use rsocket_rust::prelude::*;
5+
use rsocket_rust::Result;
56
use rsocket_rust_transport_tcp::TcpServerTransport;
6-
use std::error::Error;
77
use std::sync::Arc;
88
use tokio_postgres::{Client as PgClient, NoTls};
99

1010
#[tokio::main]
11-
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
11+
async fn main() -> Result<()> {
1212
env_logger::builder().format_timestamp_millis().init();
1313
let dao = Dao::try_new().await?;
1414
RSocketFactory::receive()
@@ -25,7 +25,7 @@ struct Dao {
2525
}
2626

2727
impl RSocket for Dao {
28-
fn request_response(&self, _: Payload) -> Mono<Result<Payload, RSocketError>> {
28+
fn request_response(&self, _: Payload) -> Mono<Result<Payload>> {
2929
let client = self.client.clone();
3030
Box::pin(async move {
3131
let row = client
@@ -45,20 +45,17 @@ impl RSocket for Dao {
4545
unimplemented!()
4646
}
4747

48-
fn request_stream(&self, _: Payload) -> Flux<Result<Payload, RSocketError>> {
48+
fn request_stream(&self, _: Payload) -> Flux<Result<Payload>> {
4949
unimplemented!()
5050
}
5151

52-
fn request_channel(
53-
&self,
54-
_: Flux<Result<Payload, RSocketError>>,
55-
) -> Flux<Result<Payload, RSocketError>> {
52+
fn request_channel(&self, _: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
5653
unimplemented!()
5754
}
5855
}
5956

6057
impl Dao {
61-
async fn try_new() -> Result<Dao, Box<dyn Error + Sync + Send>> {
58+
async fn try_new() -> Result<Dao> {
6259
let (client, connection) =
6360
tokio_postgres::connect("host=localhost user=postgres password=postgres", NoTls)
6461
.await?;

0 commit comments

Comments
 (0)