Skip to content

Commit 0439e93

Browse files
committed
feat: redesign RSocket trait based on async_trait
1 parent fbcbe2b commit 0439e93

File tree

16 files changed

+270
-228
lines changed

16 files changed

+270
-228
lines changed

examples/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ env_logger = "0.8.2"
1111
futures = "0.3.8"
1212
clap = "2.33.3"
1313
tokio-postgres = "0.6.0"
14-
redis = "0.17.0"
14+
redis = "0.16.0"
15+
async-trait = "0.1.42"
1516

1617
[dev-dependencies.rsocket_rust]
1718
path = "../rsocket"

examples/cli.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
extern crate log;
33

44
use rsocket_rust::prelude::*;
5+
use rsocket_rust::utils::EchoRSocket;
56
use rsocket_rust::Result;
67
use rsocket_rust_transport_tcp::TcpClientTransport;
78

examples/echo.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ extern crate log;
44
use clap::{App, Arg, SubCommand};
55
use rsocket_rust::prelude::*;
66
use rsocket_rust::transport::Connection;
7+
use rsocket_rust::utils::EchoRSocket;
78
use rsocket_rust::Result;
89
use rsocket_rust_transport_tcp::{
910
TcpClientTransport, TcpServerTransport, UnixClientTransport, UnixServerTransport,
@@ -50,7 +51,7 @@ where
5051

5152
match mode {
5253
RequestMode::FNF => {
53-
cli.fire_and_forget(req).await;
54+
cli.fire_and_forget(req).await?;
5455
}
5556
RequestMode::STREAM => {
5657
let mut results = cli.request_stream(req);
@@ -79,7 +80,7 @@ where
7980
}
8081
}
8182
RequestMode::REQUEST => {
82-
let res = cli.request_response(req).await.expect("Request failed!");
83+
let res = cli.request_response(req).await?;
8384
info!("{:?}", res);
8485
}
8586
}

examples/postgres.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#[macro_use]
22
extern crate log;
33

4+
use async_trait::async_trait;
45
use rsocket_rust::prelude::*;
56
use rsocket_rust::Result;
67
use rsocket_rust_transport_tcp::TcpServerTransport;
@@ -24,24 +25,23 @@ struct Dao {
2425
client: Arc<PgClient>,
2526
}
2627

28+
#[async_trait]
2729
impl RSocket for Dao {
28-
fn request_response(&self, _: Payload) -> Mono<Result<Payload>> {
30+
async fn request_response(&self, _: Payload) -> Result<Payload> {
2931
let client = self.client.clone();
30-
Box::pin(async move {
31-
let row = client
32-
.query_one("SELECT 'world' AS hello", &[])
33-
.await
34-
.expect("Execute SQL failed!");
35-
let result: String = row.get("hello");
36-
Ok(Payload::builder().set_data_utf8(&result).build())
37-
})
32+
let row = client
33+
.query_one("SELECT 'world' AS hello", &[])
34+
.await
35+
.expect("Execute SQL failed!");
36+
let result: String = row.get("hello");
37+
Ok(Payload::builder().set_data_utf8(&result).build())
3838
}
3939

40-
fn metadata_push(&self, _: Payload) -> Mono<()> {
40+
async fn metadata_push(&self, _: Payload) -> Result<()> {
4141
unimplemented!()
4242
}
4343

44-
fn fire_and_forget(&self, _: Payload) -> Mono<()> {
44+
async fn fire_and_forget(&self, _: Payload) -> Result<()> {
4545
unimplemented!()
4646
}
4747

examples/proxy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ extern crate log;
33

44
use futures::executor::block_on;
55
use rsocket_rust::prelude::*;
6+
use rsocket_rust::utils::EchoRSocket;
67
use rsocket_rust::Result;
78
use rsocket_rust_transport_tcp::*;
89

examples/redis.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use async_trait::async_trait;
12
use redis::Client as RedisClient;
23
use rsocket_rust::prelude::*;
34
use rsocket_rust::Result;
@@ -28,33 +29,34 @@ impl FromStr for RedisDao {
2829
}
2930
}
3031

32+
#[async_trait]
3133
impl RSocket for RedisDao {
32-
fn request_response(&self, req: Payload) -> Mono<Result<Payload>> {
34+
async fn request_response(&self, req: Payload) -> Result<Payload> {
3335
let client = self.inner.clone();
34-
35-
Box::pin(async move {
36-
let mut conn: redis::aio::Connection = client
37-
.get_async_connection()
38-
.await
39-
.expect("Connect redis failed!");
40-
let value: String = redis::cmd("GET")
41-
.arg(&[req.data_utf8()])
42-
.query_async(&mut conn)
43-
.await
44-
.unwrap_or("<nil>".to_owned());
45-
Ok(Payload::builder().set_data_utf8(&value).build())
46-
})
36+
let mut conn: redis::aio::Connection = client
37+
.get_async_connection()
38+
.await
39+
.expect("Connect redis failed!");
40+
let value: String = redis::cmd("GET")
41+
.arg(&[req.data_utf8()])
42+
.query_async(&mut conn)
43+
.await
44+
.unwrap_or("<nil>".to_owned());
45+
Ok(Payload::builder().set_data_utf8(&value).build())
4746
}
4847

49-
fn metadata_push(&self, _req: Payload) -> Mono<()> {
48+
async fn metadata_push(&self, _req: Payload) -> Result<()> {
5049
unimplemented!()
5150
}
52-
fn fire_and_forget(&self, _req: Payload) -> Mono<()> {
51+
52+
async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
5353
unimplemented!()
5454
}
55+
5556
fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
5657
unimplemented!()
5758
}
59+
5860
fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
5961
unimplemented!()
6062
}

rsocket-messaging/src/requester.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ impl RequestSpec {
259259

260260
pub async fn retrieve(self) -> Result<()> {
261261
let (req, _mime_type, rsocket) = self.preflight()?;
262-
rsocket.fire_and_forget(req).await;
263-
Ok(())
262+
rsocket.fire_and_forget(req).await
264263
}
265264

266265
pub async fn retrieve_mono(self) -> Unpacker {
@@ -364,10 +363,16 @@ fn do_unmarshal<T>(mime_type: &MimeType, raw: &Bytes) -> Result<Option<T>>
364363
where
365364
T: Sized + DeserializeOwned,
366365
{
367-
// TODO: support more mime types
368-
match *mime_type {
369-
MimeType::APPLICATION_JSON => Ok(Some(unmarshal(misc::json(), &raw.as_ref())?)),
370-
MimeType::APPLICATION_CBOR => Ok(Some(unmarshal(misc::cbor(), &raw.as_ref())?)),
366+
match mime_type.as_u8() {
367+
Some(code) => {
368+
if code == MimeType::APPLICATION_JSON.as_u8().unwrap() {
369+
Ok(Some(unmarshal(misc::json(), &raw.as_ref())?))
370+
} else if code == MimeType::APPLICATION_CBOR.as_u8().unwrap() {
371+
Ok(Some(unmarshal(misc::cbor(), &raw.as_ref())?))
372+
} else {
373+
Err(RSocketError::WithDescription("unsupported mime type!".into()).into())
374+
}
375+
}
371376
_ => Err(RSocketError::WithDescription("unsupported mime type!".into()).into()),
372377
}
373378
}
@@ -376,10 +381,16 @@ fn do_marshal<T>(mime_type: &MimeType, data: &T) -> Result<Vec<u8>>
376381
where
377382
T: Sized + Serialize,
378383
{
379-
// TODO: support more mime types
380-
match *mime_type {
381-
MimeType::APPLICATION_JSON => marshal(misc::json(), data),
382-
MimeType::APPLICATION_CBOR => marshal(misc::cbor(), data),
384+
match mime_type.as_u8() {
385+
Some(code) => {
386+
if code == MimeType::APPLICATION_JSON.as_u8().unwrap() {
387+
marshal(misc::json(), data)
388+
} else if code == MimeType::APPLICATION_CBOR.as_u8().unwrap() {
389+
marshal(misc::cbor(), data)
390+
} else {
391+
Err(RSocketError::WithDescription("unsupported mime type!".into()).into())
392+
}
393+
}
383394
_ => Err(RSocketError::WithDescription("unsupported mime type!".into()).into()),
384395
}
385396
}

rsocket-test/tests/test_clients.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ extern crate log;
33

44
use futures::stream;
55
use rsocket_rust::prelude::*;
6+
use rsocket_rust::utils::EchoRSocket;
67
use rsocket_rust::Client;
78
use rsocket_rust_transport_tcp::{
89
TcpClientTransport, TcpServerTransport, UnixClientTransport, UnixServerTransport,
@@ -216,13 +217,13 @@ async fn exec_request_response(socket: &Client) {
216217
async fn exec_metadata_push(socket: &Client) {
217218
let pa = Payload::builder().set_metadata_utf8("Hello World!").build();
218219
// metadata push
219-
socket.metadata_push(pa).await;
220+
let _ = socket.metadata_push(pa).await;
220221
}
221222

222223
async fn exec_fire_and_forget(socket: &Client) {
223224
// request fnf
224225
let fnf = Payload::from("Hello World!");
225-
socket.fire_and_forget(fnf).await;
226+
let _ = socket.fire_and_forget(fnf).await;
226227
}
227228

228229
async fn exec_request_stream(socket: &Client) {

rsocket-transport-wasm/src/misc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl JsClient {
8787
let request: JsPayload = request.into_serde().unwrap();
8888

8989
future_to_promise(async move {
90-
inner.fire_and_forget(request.into()).await;
90+
let _ = inner.fire_and_forget(request.into()).await;
9191
Ok(JsValue::NULL)
9292
})
9393
}

rsocket/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ async-trait = "0.1.42"
1818
dashmap = "3.11.10"
1919
thiserror = "1.0.22"
2020
anyhow = "1.0.36"
21+
async-stream = "0.3.0"
2122

2223
[target.'cfg(target_arch = "wasm32")'.dependencies]
2324
wasm-bindgen-futures = "0.4.19"

0 commit comments

Comments
 (0)