Skip to content

Commit 5c8a8ee

Browse files
jjeffcaiicaiweiwei03
authored andcommitted
New feature: messaging.
1 parent 0cdf6d3 commit 5c8a8ee

40 files changed

+892
-491
lines changed

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
[workspace]
22

33
members = [
4+
# core
45
"rsocket",
6+
# transports
57
"rsocket-transport-tcp",
68
"rsocket-transport-websocket",
79
"rsocket-transport-wasm",
8-
9-
# Internal
10+
# extra
11+
"rsocket-messaging",
12+
# internal
1013
"examples",
1114
"rsocket-test",
1215
]

examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ path = "../rsocket-transport-tcp"
2121
path = "../rsocket-transport-websocket"
2222

2323
[dev-dependencies.tokio]
24-
version = "0.2.16"
24+
version = "0.2.19"
2525
default-features = false
2626
features = ["full"]
2727

rsocket-messaging/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "rsocket_rust_messaging"
3+
version = "0.1.0"
4+
authors = ["Jeffsky <jjeffcaii@outlook.com>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
log = "0.4.8"
9+
futures = "0.3.4"
10+
bytes = "0.5.4"
11+
serde = "1.0.106"
12+
serde_json = "1.0.52"
13+
serde_cbor = "0.11.1"
14+
hex = "0.4.2"
15+
16+
[dependencies.rsocket_rust]
17+
path = "../rsocket"
18+
features = ["frame"]

rsocket-messaging/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#[macro_use]
2+
extern crate log;
3+
4+
mod misc;
5+
mod requester;
6+
7+
pub use requester::{RequestSpec, Requester};

rsocket-messaging/src/misc.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use bytes::{BufMut, BytesMut};
2+
use rsocket_rust::extension::{MimeType, MIME_APPLICATION_CBOR, MIME_APPLICATION_JSON};
3+
use serde::{Deserialize, Serialize};
4+
use std::error::Error;
5+
6+
pub(crate) fn unmarshal<'a, T>(mime_type: &MimeType, raw: &'a [u8]) -> Result<T, Box<dyn Error>>
7+
where
8+
T: Deserialize<'a>,
9+
{
10+
match *mime_type {
11+
MIME_APPLICATION_JSON => Ok(serde_json::from_slice(raw)?),
12+
MIME_APPLICATION_CBOR => Ok(serde_cbor::from_slice(raw)?),
13+
_ => panic!(""),
14+
}
15+
}
16+
17+
pub(crate) fn marshal<T>(
18+
mime_type: &MimeType,
19+
bf: &mut BytesMut,
20+
data: &T,
21+
) -> Result<(), Box<dyn Error>>
22+
where
23+
T: Sized + Serialize,
24+
{
25+
match *mime_type {
26+
MIME_APPLICATION_JSON => {
27+
let raw = serde_json::to_vec(data)?;
28+
bf.put_slice(&raw[..]);
29+
Ok(())
30+
}
31+
MIME_APPLICATION_CBOR => {
32+
let raw = serde_cbor::to_vec(data)?;
33+
bf.put_slice(&raw[..]);
34+
Ok(())
35+
}
36+
_ => panic!(""),
37+
}
38+
}

rsocket-messaging/src/requester.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use super::misc::{marshal, unmarshal};
2+
use bytes::{BufMut, BytesMut};
3+
use rsocket_rust::error::RSocketError;
4+
use rsocket_rust::extension::{
5+
CompositeMetadata, MimeType, RoutingMetadata, MIME_APPLICATION_JSON,
6+
MIME_MESSAGE_X_RSOCKET_ROUTING_V0,
7+
};
8+
use rsocket_rust::prelude::*;
9+
use rsocket_rust::utils::Writeable;
10+
use serde::{Deserialize, Serialize};
11+
use std::collections::LinkedList;
12+
use std::error::Error;
13+
14+
pub struct Requester<S>
15+
where
16+
S: RSocket + Clone,
17+
{
18+
rsocket: S,
19+
}
20+
21+
pub struct RequestSpec<S>
22+
where
23+
S: RSocket + Clone,
24+
{
25+
data_buf: BytesMut,
26+
rsocket: S,
27+
data_mime_type: MimeType,
28+
metadatas: LinkedList<(MimeType, Vec<u8>)>,
29+
}
30+
31+
impl<C> Requester<C>
32+
where
33+
C: RSocket + Clone,
34+
{
35+
pub fn new(rsocket: C) -> Requester<C> {
36+
Requester { rsocket }
37+
}
38+
39+
pub fn route(&self, route: &str) -> RequestSpec<C> {
40+
let routing = RoutingMetadata::builder().push_str(route).build();
41+
let mut buf = BytesMut::new();
42+
routing.write_to(&mut buf);
43+
44+
let mut metadatas: LinkedList<(MimeType, Vec<u8>)> = Default::default();
45+
metadatas.push_back((MIME_MESSAGE_X_RSOCKET_ROUTING_V0, buf.to_vec()));
46+
RequestSpec {
47+
data_buf: BytesMut::new(),
48+
rsocket: self.rsocket.clone(),
49+
data_mime_type: MIME_APPLICATION_JSON,
50+
metadatas,
51+
}
52+
}
53+
}
54+
55+
impl<C> RequestSpec<C>
56+
where
57+
C: RSocket + Clone,
58+
{
59+
pub fn metadata<T>(&mut self, metadata: &T, mime_type: &str) -> Result<(), Box<dyn Error>>
60+
where
61+
T: Sized + Serialize,
62+
{
63+
let mime_type = MimeType::from(mime_type);
64+
let mut b = BytesMut::new();
65+
marshal(&mime_type, &mut b, metadata)?;
66+
self.metadatas.push_back((mime_type, b.to_vec()));
67+
Ok(())
68+
}
69+
70+
pub fn data<T>(&mut self, data: &T) -> Result<(), Box<dyn Error>>
71+
where
72+
T: Sized + Serialize,
73+
{
74+
marshal(&self.data_mime_type, &mut self.data_buf, data)
75+
}
76+
77+
pub async fn retrieve_mono(&self) -> Unpacker {
78+
let req = self.to_req();
79+
let res = self.rsocket.request_response(req).await;
80+
Unpacker {
81+
mime_type: self.data_mime_type.clone(),
82+
inner: res,
83+
}
84+
}
85+
86+
fn to_req(&self) -> Payload {
87+
let mut b = BytesMut::new();
88+
let mut c = CompositeMetadata::builder();
89+
for (a, b) in self.metadatas.iter() {
90+
c = c.push(a.clone(), b);
91+
}
92+
c.build().write_to(&mut b);
93+
Payload::builder()
94+
.set_metadata(b.to_vec())
95+
.set_data(self.data_buf.to_vec())
96+
.build()
97+
}
98+
}
99+
100+
pub struct Unpacker {
101+
mime_type: MimeType,
102+
inner: Result<Payload, RSocketError>,
103+
}
104+
105+
impl Unpacker {
106+
pub fn block<'a, T>(&'a self) -> Result<Option<T>, Box<dyn Error>>
107+
where
108+
T: Deserialize<'a>,
109+
{
110+
match &self.inner {
111+
Ok(inner) => match inner.data() {
112+
Some(raw) => Ok(Some(unmarshal(&self.mime_type, &raw.as_ref())?)),
113+
None => Ok(None),
114+
},
115+
Err(e) => Err(format!("{}", e).into()),
116+
}
117+
}
118+
}

rsocket-test/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ env_logger = "0.7.1"
1212
bytes = "0.5.4"
1313
hex = "0.4.2"
1414
rand = "0.7.3"
15+
serde = "1.0.106"
16+
serde_derive = "1.0.106"
1517

1618
[dev-dependencies.rsocket_rust]
1719
path = "../rsocket"
@@ -23,7 +25,10 @@ path = "../rsocket-transport-tcp"
2325
[dev-dependencies.rsocket_rust_transport_websocket]
2426
path = "../rsocket-transport-websocket"
2527

28+
[dev-dependencies.rsocket_rust_messaging]
29+
path = "../rsocket-messaging"
30+
2631
[dev-dependencies.tokio]
27-
version = "0.2.16"
32+
version = "0.2.20"
2833
default-features = false
2934
features = ["full"]

rsocket-test/tests/test_composite_metadata.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use bytes::BytesMut;
2-
use rsocket_rust::extension::{CompositeMetadata, Metadata};
2+
use rsocket_rust::extension::{self, CompositeMetadata, CompositeMetadataEntry, MimeType};
33
use rsocket_rust::utils::Writeable;
44

55
#[test]
66
fn encode_and_decode_composite_metadata() {
7-
let bingo = |metadatas: Vec<&Metadata>| {
7+
let bingo = |metadatas: Vec<&CompositeMetadataEntry>| {
88
assert_eq!(2, metadatas.len());
9-
assert_eq!("text/plain", metadatas[0].get_mime());
10-
assert_eq!(b"Hello World!", metadatas[0].get_payload().as_ref());
11-
assert_eq!("application/not_well", metadatas[1].get_mime());
12-
assert_eq!(b"Not Well!", metadatas[1].get_payload().as_ref());
9+
assert_eq!(extension::MIME_TEXT_PLAIN, *metadatas[0].get_mime_type());
10+
assert_eq!("Hello World!", metadatas[0].get_metadata_utf8());
11+
assert_eq!(
12+
MimeType::from("application/not_well"),
13+
*metadatas[1].get_mime_type()
14+
);
15+
assert_eq!(b"Not Well!", metadatas[1].get_metadata().as_ref());
1316
};
1417

1518
let cm = CompositeMetadata::builder()
16-
.push("text/plain", b"Hello World!")
17-
.push("application/not_well", "Not Well!")
19+
.push(MimeType::from("text/plain"), b"Hello World!")
20+
.push(MimeType::from("application/not_well"), "Not Well!")
1821
.build();
1922
bingo(cm.iter().collect());
2023

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#[macro_use]
2+
extern crate log;
3+
#[macro_use]
4+
extern crate serde_derive;
5+
6+
use rsocket_rust::prelude::*;
7+
use rsocket_rust_messaging::*;
8+
use rsocket_rust_transport_tcp::TcpClientTransport;
9+
10+
#[derive(Serialize, Deserialize, Debug)]
11+
pub struct Student {
12+
id: i64,
13+
name: String,
14+
birth: String,
15+
}
16+
17+
#[derive(Serialize, Deserialize, Debug)]
18+
pub struct Response<T> {
19+
code: i32,
20+
message: Option<String>,
21+
data: T,
22+
}
23+
24+
#[tokio::main]
25+
#[test]
26+
#[ignore]
27+
async fn test_messaging() {
28+
let rsocket = RSocketFactory::connect()
29+
.transport(TcpClientTransport::from("tcp://127.0.0.1:7878"))
30+
.data_mime_type("application/json")
31+
.metadata_mime_type("message/x.rsocket.composite-metadata.v0")
32+
.start()
33+
.await
34+
.expect("Connect failed!");
35+
let requester = Requester::new(rsocket);
36+
37+
let post = Student {
38+
id: 1234,
39+
name: "Jeffsky".to_owned(),
40+
birth: "2020-01-01".to_owned(),
41+
};
42+
let mut req = requester.route("student.v1.upsert");
43+
req.data(&post).unwrap();
44+
let res: Response<Student> = req
45+
.retrieve_mono()
46+
.await
47+
.block()
48+
.expect("Retrieve failed!")
49+
.expect("Empty result!");
50+
println!("------> RESPONSE: {:?}", res);
51+
()
52+
}

rsocket-test/tests/test_mimes.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
extern crate rsocket_rust;
22

3-
use rsocket_rust::mime::WellKnownMIME;
3+
use rsocket_rust::extension::{self, MimeType};
44

55
#[test]
66
fn test_wellknown() {
7-
let got = WellKnownMIME::from("application/json");
8-
assert_eq!(WellKnownMIME::ApplicationJson, got);
9-
WellKnownMIME::foreach(|m| {
10-
let mut result = WellKnownMIME::from(m.raw());
11-
assert_eq!(m, &result);
12-
result = WellKnownMIME::from(format!("{}", m));
13-
assert_eq!(m, &result);
14-
});
7+
let well = MimeType::from("application/json");
8+
assert_eq!(extension::MIME_APPLICATION_JSON, well);
9+
assert_eq!(0x05, well.as_u8().unwrap());
10+
let custom = MimeType::from("application/custom");
11+
assert_eq!(MimeType::Normal("application/custom".to_owned()), custom);
1512
}

0 commit comments

Comments
 (0)