Skip to content

Commit 3987e2c

Browse files
committed
Change messaging RequestSpec to fluent api.
1 parent 2676bb4 commit 3987e2c

File tree

5 files changed

+177
-87
lines changed

5 files changed

+177
-87
lines changed

rsocket-messaging/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = "2018"
88
log = "0.4.8"
99
futures = "0.3.5"
1010
bytes = "0.5.4"
11+
lazy_static = "1.4.0"
1112
serde = "1.0.110"
1213
serde_json = "1.0.53"
1314
serde_cbor = "0.11.1"

rsocket-messaging/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#[macro_use]
22
extern crate log;
3+
// #[macro_use]
4+
// extern crate lazy_static;
35

46
mod misc;
57
mod requester;
68

9+
pub use misc::{cbor, json, SerDe};
710
pub use requester::{RequestSpec, Requester, RequesterBuilder};

rsocket-messaging/src/misc.rs

Lines changed: 64 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,73 @@
1-
use bytes::{BufMut, BytesMut};
2-
use rsocket_rust::extension::MimeType;
3-
use serde::{Deserialize, Serialize};
1+
use serde::{de::DeserializeOwned, Serialize};
42
use std::error::Error;
53

6-
pub(crate) fn unmarshal<'a, T>(mime_type: &MimeType, raw: &'a [u8]) -> Result<T, Box<dyn Error>>
7-
where
8-
T: Deserialize<'a>,
9-
{
10-
match *mime_type {
11-
MimeType::APPLICATION_JSON => Ok(serde_json::from_slice(raw)?),
12-
MimeType::APPLICATION_CBOR => Ok(serde_cbor::from_slice(raw)?),
13-
_ => panic!(""),
4+
pub trait SerDe {
5+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
6+
where
7+
Self: Sized,
8+
T: Sized + Serialize;
9+
10+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
11+
where
12+
Self: Sized,
13+
T: Sized + DeserializeOwned;
14+
}
15+
16+
#[derive(Default)]
17+
struct JsonSerDe {}
18+
19+
impl SerDe for JsonSerDe {
20+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
21+
where
22+
T: Sized + Serialize,
23+
{
24+
Ok(serde_json::to_vec(data)?)
1425
}
26+
27+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
28+
where
29+
T: Sized + DeserializeOwned,
30+
{
31+
Ok(serde_json::from_slice(raw)?)
32+
}
33+
}
34+
35+
pub fn json() -> impl SerDe {
36+
JsonSerDe {}
37+
}
38+
39+
pub fn cbor() -> impl SerDe {
40+
CborSerDe {}
1541
}
1642

17-
pub(crate) fn marshal<T>(
18-
mime_type: &MimeType,
19-
bf: &mut BytesMut,
20-
data: &T,
21-
) -> Result<(), Box<dyn Error>>
43+
struct CborSerDe {}
44+
45+
impl SerDe for CborSerDe {
46+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
47+
where
48+
T: Sized + Serialize,
49+
{
50+
Ok(serde_cbor::to_vec(data)?)
51+
}
52+
53+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
54+
where
55+
T: Sized + DeserializeOwned,
56+
{
57+
Ok(serde_cbor::from_slice(raw)?)
58+
}
59+
}
60+
61+
pub(crate) fn marshal<T>(ser: impl SerDe, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
2262
where
2363
T: Sized + Serialize,
2464
{
25-
match *mime_type {
26-
MimeType::APPLICATION_JSON => {
27-
let raw = serde_json::to_vec(data)?;
28-
bf.put_slice(&raw[..]);
29-
Ok(())
30-
}
31-
MimeType::APPLICATION_CBOR => {
32-
let raw = serde_cbor::to_vec(data)?;
33-
bf.put_slice(&raw[..]);
34-
Ok(())
35-
}
36-
_ => panic!(""),
37-
}
65+
ser.marshal(data)
66+
}
67+
68+
pub(crate) fn unmarshal<T>(de: impl SerDe, raw: &[u8]) -> Result<T, Box<dyn Error>>
69+
where
70+
T: Sized + DeserializeOwned,
71+
{
72+
de.unmarshal(raw)
3873
}

rsocket-messaging/src/requester.rs

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use super::misc::{marshal, unmarshal};
2-
use bytes::BytesMut;
1+
use super::misc::{self, marshal, unmarshal};
2+
use bytes::{Bytes, BytesMut};
33
use rsocket_rust::error::RSocketError;
44
use rsocket_rust::extension::{
55
CompositeMetadata, CompositeMetadataEntry, MimeType, RoutingMetadata,
@@ -8,14 +8,18 @@ use rsocket_rust::prelude::*;
88
use rsocket_rust::utils::Writeable;
99
use rsocket_rust_transport_tcp::TcpClientTransport;
1010
use rsocket_rust_transport_websocket::WebsocketClientTransport;
11-
use serde::{Deserialize, Serialize};
11+
use serde::{de::DeserializeOwned, Serialize};
1212
use std::collections::LinkedList;
1313
use std::error::Error;
1414
use std::net::SocketAddr;
1515
use std::result::Result;
1616
use std::sync::Arc;
1717
use url::Url;
1818

19+
type FnMetadata = Box<dyn FnMut() -> Result<(MimeType, Vec<u8>), Box<dyn Error>>>;
20+
type FnData = Box<dyn FnMut(&MimeType) -> Result<Vec<u8>, Box<dyn Error>>>;
21+
type PreflightResult = Result<(Payload, MimeType, Arc<Box<dyn RSocket>>), Box<dyn Error>>;
22+
1923
enum TransportKind {
2024
TCP(String, u16),
2125
WS(String),
@@ -26,10 +30,10 @@ pub struct Requester {
2630
}
2731

2832
pub struct RequestSpec {
29-
data: Option<Vec<u8>>,
3033
rsocket: Arc<Box<dyn RSocket>>,
3134
data_mime_type: MimeType,
32-
metadatas: LinkedList<(MimeType, Vec<u8>)>,
35+
metadatas: LinkedList<FnMetadata>,
36+
data: Option<FnData>,
3337
}
3438

3539
pub struct RequesterBuilder {
@@ -74,14 +78,13 @@ impl RequesterBuilder {
7478
D: Sized + Serialize,
7579
{
7680
// TODO: lazy set
77-
let mut bf = BytesMut::new();
7881
let result = match &self.data_mime_type {
79-
Some(m) => marshal(m, &mut bf, data),
80-
None => marshal(&MimeType::APPLICATION_JSON, &mut bf, data),
82+
Some(m) => do_marshal(m, data),
83+
None => do_marshal(&MimeType::APPLICATION_JSON, data),
8184
};
8285
match result {
83-
Ok(()) => {
84-
self.data = Some(bf.to_vec());
86+
Ok(raw) => {
87+
self.data = Some(raw);
8588
}
8689
Err(e) => {
8790
error!("marshal failed: {:?}", e);
@@ -96,11 +99,10 @@ impl RequesterBuilder {
9699
T: Into<MimeType>,
97100
{
98101
// TODO: lazy set
99-
let mut bf = BytesMut::new();
100102
let mime_type = mime_type.into();
101-
match marshal(&mime_type, &mut bf, metadata) {
102-
Ok(()) => {
103-
let entry = CompositeMetadataEntry::new(mime_type, bf.freeze());
103+
match do_marshal(&mime_type, metadata) {
104+
Ok(raw) => {
105+
let entry = CompositeMetadataEntry::new(mime_type, Bytes::from(raw));
104106
self.metadata.push(entry);
105107
}
106108
Err(e) => error!("marshal failed: {:?}", e),
@@ -203,100 +205,143 @@ impl Requester {
203205
let mut buf = BytesMut::new();
204206
routing.write_to(&mut buf);
205207

206-
let mut metadatas: LinkedList<(MimeType, Vec<u8>)> = Default::default();
207-
metadatas.push_back((MimeType::MESSAGE_X_RSOCKET_ROUTING_V0, buf.to_vec()));
208+
let mut metadatas: LinkedList<FnMetadata> = LinkedList::new();
209+
metadatas.push_back(Box::new(move || {
210+
Ok((MimeType::MESSAGE_X_RSOCKET_ROUTING_V0, buf.to_vec()))
211+
}));
212+
208213
RequestSpec {
209-
data: None,
210214
rsocket: self.rsocket.clone(),
211215
data_mime_type: MimeType::APPLICATION_JSON,
212216
metadatas,
217+
data: None,
213218
}
214219
}
215220
}
216221

217222
impl RequestSpec {
218-
pub fn metadata<T, M>(&mut self, metadata: &T, mime_type: M) -> Result<(), Box<dyn Error>>
223+
pub fn metadata<T, M>(mut self, metadata: T, mime_type: M) -> Self
219224
where
220-
T: Sized + Serialize,
225+
T: Sized + Serialize + 'static,
221226
M: Into<MimeType>,
222227
{
223228
let mime_type = mime_type.into();
224-
let mut b = BytesMut::new();
225-
marshal(&mime_type, &mut b, metadata)?;
226-
self.metadatas.push_back((mime_type, b.to_vec()));
227-
Ok(())
229+
let f: FnMetadata = Box::new(move || {
230+
let raw = do_marshal(&mime_type, &metadata)?;
231+
Ok((mime_type.clone(), raw))
232+
});
233+
self.metadatas.push_back(f);
234+
self
228235
}
229236

230-
pub fn metadata_raw<I, M>(&mut self, metadata: I, mime_type: M) -> Result<(), Box<dyn Error>>
237+
pub fn metadata_raw<I, M>(mut self, metadata: I, mime_type: M) -> Self
231238
where
232239
I: Into<Vec<u8>>,
233240
M: Into<MimeType>,
234241
{
242+
let mime_type = mime_type.into();
243+
let metadata = metadata.into();
235244
self.metadatas
236-
.push_back((mime_type.into(), metadata.into()));
237-
Ok(())
245+
.push_back(Box::new(move || Ok((mime_type.clone(), metadata.clone()))));
246+
self
238247
}
239248

240-
pub fn data<T>(&mut self, data: &T) -> Result<(), Box<dyn Error>>
249+
pub fn data<T>(mut self, data: T) -> Self
241250
where
242-
T: Sized + Serialize,
251+
T: Sized + Serialize + 'static,
243252
{
244-
let mut bf = BytesMut::new();
245-
marshal(&self.data_mime_type, &mut bf, data)?;
246-
self.data = Some(bf.to_vec());
247-
Ok(())
253+
self.data = Some(Box::new(move |m| do_marshal(m, &data)));
254+
self
248255
}
249256

250-
pub fn data_raw<I>(&mut self, data: I) -> Result<(), Box<dyn Error>>
257+
pub fn data_raw<I>(mut self, data: I) -> Self
251258
where
252259
I: Into<Vec<u8>>,
253260
{
254-
self.data = Some(data.into());
255-
Ok(())
261+
let data = data.into();
262+
self.data = Some(Box::new(move |_m| Ok(data.clone())));
263+
self
256264
}
257265

258266
pub async fn retrieve_mono(self) -> Unpacker {
259-
let (req, mime_type, rsocket) = self.preflight();
260-
let res = rsocket.request_response(req).await;
261-
Unpacker {
262-
mime_type,
263-
inner: res,
267+
match self.preflight() {
268+
Ok((req, mime_type, rsocket)) => {
269+
let res = rsocket.request_response(req).await;
270+
match res {
271+
Ok(v) => Unpacker {
272+
inner: Ok((mime_type, v)),
273+
},
274+
Err(e) => Unpacker { inner: Err(e) },
275+
}
276+
}
277+
Err(e) => {
278+
// TODO: better error
279+
let msg = format!("{}", e);
280+
Unpacker {
281+
inner: Err(RSocketError::from(msg)),
282+
}
283+
}
264284
}
265285
}
266286

267287
#[inline]
268-
fn preflight(self) -> (Payload, MimeType, Arc<Box<dyn RSocket>>) {
288+
fn preflight(self) -> PreflightResult {
269289
let mut b = BytesMut::new();
270290
let mut c = CompositeMetadata::builder();
271-
for (mime_type, raw) in self.metadatas.into_iter() {
291+
292+
for mut b in self.metadatas.into_iter() {
293+
let (mime_type, raw) = b()?;
272294
c = c.push(mime_type, raw);
273295
}
274296
c.build().write_to(&mut b);
275297

276298
let mut bu = Payload::builder().set_metadata(b.to_vec());
277-
if let Some(raw) = self.data {
299+
if let Some(mut gen) = self.data {
300+
let raw = gen(&self.data_mime_type)?;
278301
bu = bu.set_data(raw);
279302
}
280-
(bu.build(), self.data_mime_type, self.rsocket)
303+
Ok((bu.build(), self.data_mime_type, self.rsocket))
281304
}
282305
}
283306

284307
pub struct Unpacker {
285-
mime_type: MimeType,
286-
inner: Result<Payload, RSocketError>,
308+
inner: Result<(MimeType, Payload), RSocketError>,
287309
}
288310

289311
impl Unpacker {
290-
pub fn block<'a, T>(&'a self) -> Result<Option<T>, Box<dyn Error>>
312+
pub fn block<T>(&self) -> Result<Option<T>, Box<dyn Error>>
291313
where
292-
T: Deserialize<'a>,
314+
T: Sized + DeserializeOwned,
293315
{
294316
match &self.inner {
295-
Ok(inner) => match inner.data() {
296-
Some(raw) => Ok(Some(unmarshal(&self.mime_type, &raw.as_ref())?)),
317+
Ok((mime_type, inner)) => match inner.data() {
318+
// TODO: support more mime types.
319+
Some(raw) => match *mime_type {
320+
MimeType::APPLICATION_JSON => {
321+
let t = unmarshal(misc::json(), &raw.as_ref())?;
322+
Ok(Some(t))
323+
}
324+
MimeType::APPLICATION_CBOR => {
325+
let t = unmarshal(misc::cbor(), &raw.as_ref())?;
326+
Ok(Some(t))
327+
}
328+
_ => Err("unsupported mime type!".into()),
329+
},
297330
None => Ok(None),
298331
},
299332
Err(e) => Err(format!("{}", e).into()),
300333
}
301334
}
302335
}
336+
337+
fn do_marshal<T>(mime_type: &MimeType, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
338+
where
339+
T: Sized + Serialize,
340+
{
341+
// TODO: support more mime types
342+
match *mime_type {
343+
MimeType::APPLICATION_JSON => marshal(misc::json(), data),
344+
MimeType::APPLICATION_CBOR => marshal(misc::cbor(), data),
345+
_ => Err("unsupported mime type!".into()),
346+
}
347+
}

0 commit comments

Comments
 (0)