Skip to content

Commit 50bdcd1

Browse files
committed
Support flux result for messaging requester.
1 parent 3987e2c commit 50bdcd1

File tree

2 files changed

+126
-23
lines changed

2 files changed

+126
-23
lines changed

rsocket-messaging/src/requester.rs

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use url::Url;
1919
type FnMetadata = Box<dyn FnMut() -> Result<(MimeType, Vec<u8>), Box<dyn Error>>>;
2020
type FnData = Box<dyn FnMut(&MimeType) -> Result<Vec<u8>, Box<dyn Error>>>;
2121
type PreflightResult = Result<(Payload, MimeType, Arc<Box<dyn RSocket>>), Box<dyn Error>>;
22+
type UnpackerResult = Result<(MimeType, Payload), RSocketError>;
23+
type UnpackersResult = Result<(MimeType, Flux<Result<Payload, RSocketError>>), Box<dyn Error>>;
2224

2325
enum TransportKind {
2426
TCP(String, u16),
@@ -44,6 +46,14 @@ pub struct RequesterBuilder {
4446
tp: Option<TransportKind>,
4547
}
4648

49+
pub struct Unpackers {
50+
inner: UnpackersResult,
51+
}
52+
53+
pub struct Unpacker {
54+
inner: UnpackerResult,
55+
}
56+
4757
impl Default for RequesterBuilder {
4858
fn default() -> Self {
4959
Self {
@@ -263,6 +273,16 @@ impl RequestSpec {
263273
self
264274
}
265275

276+
pub async fn retrieve(self) -> Result<(), Box<dyn Error>> {
277+
match self.preflight() {
278+
Ok((req, _mime_type, rsocket)) => {
279+
rsocket.fire_and_forget(req).await;
280+
Ok(())
281+
}
282+
Err(e) => Err(e),
283+
}
284+
}
285+
266286
pub async fn retrieve_mono(self) -> Unpacker {
267287
match self.preflight() {
268288
Ok((req, mime_type, rsocket)) => {
@@ -284,6 +304,18 @@ impl RequestSpec {
284304
}
285305
}
286306

307+
pub fn retrieve_flux(self) -> Unpackers {
308+
match self.preflight() {
309+
Ok((req, mime_type, rsocket)) => {
310+
let results = rsocket.request_stream(req);
311+
Unpackers {
312+
inner: Ok((mime_type, results)),
313+
}
314+
}
315+
Err(e) => Unpackers { inner: Err(e) },
316+
}
317+
}
318+
287319
#[inline]
288320
fn preflight(self) -> PreflightResult {
289321
let mut b = BytesMut::new();
@@ -304,36 +336,83 @@ impl RequestSpec {
304336
}
305337
}
306338

307-
pub struct Unpacker {
308-
inner: Result<(MimeType, Payload), RSocketError>,
339+
impl Unpackers {
340+
pub async fn block<T>(self) -> Result<Vec<T>, Box<dyn Error>>
341+
where
342+
T: Sized + DeserializeOwned,
343+
{
344+
let (mime_type, mut results) = self.inner?;
345+
let mut res = Vec::new();
346+
while let Some(next) = results.next().await {
347+
match next {
348+
Ok(v) => {
349+
if let Some(data) = v.data() {
350+
let t = do_unmarshal::<T>(&mime_type, data)?;
351+
if let Some(t) = t {
352+
res.push(t);
353+
}
354+
}
355+
}
356+
Err(e) => return Err(format!("{}", e).into()),
357+
}
358+
}
359+
Ok(res)
360+
}
361+
pub async fn foreach<T>(self, callback: impl Fn(T)) -> Result<(), Box<dyn Error>>
362+
where
363+
T: Sized + DeserializeOwned,
364+
{
365+
let (mime_type, mut results) = self.inner?;
366+
while let Some(next) = results.next().await {
367+
match next {
368+
Ok(v) => {
369+
if let Some(data) = v.data() {
370+
let t = do_unmarshal::<T>(&mime_type, data)?;
371+
if let Some(t) = t {
372+
callback(t);
373+
}
374+
}
375+
}
376+
Err(e) => return Err(format!("{}", e).into()),
377+
}
378+
}
379+
Ok(())
380+
}
309381
}
310382

311383
impl Unpacker {
312-
pub fn block<T>(&self) -> Result<Option<T>, Box<dyn Error>>
384+
pub fn block<T>(self) -> Result<Option<T>, Box<dyn Error>>
313385
where
314386
T: Sized + DeserializeOwned,
315387
{
316-
match &self.inner {
388+
match self.inner {
317389
Ok((mime_type, inner)) => match inner.data() {
318390
// TODO: support more mime types.
319-
Some(raw) => match *mime_type {
320-
MimeType::APPLICATION_JSON => {
321-
let t = unmarshal(misc::json(), &raw.as_ref())?;
322-
Ok(Some(t))
323-
}
324-
MimeType::APPLICATION_CBOR => {
325-
let t = unmarshal(misc::cbor(), &raw.as_ref())?;
326-
Ok(Some(t))
327-
}
328-
_ => Err("unsupported mime type!".into()),
329-
},
391+
Some(raw) => do_unmarshal(&mime_type, raw),
330392
None => Ok(None),
331393
},
332394
Err(e) => Err(format!("{}", e).into()),
333395
}
334396
}
335397
}
336398

399+
fn do_unmarshal<T>(mime_type: &MimeType, raw: &Bytes) -> Result<Option<T>, Box<dyn Error>>
400+
where
401+
T: Sized + DeserializeOwned,
402+
{
403+
match *mime_type {
404+
MimeType::APPLICATION_JSON => {
405+
let t = unmarshal(misc::json(), &raw.as_ref())?;
406+
Ok(Some(t))
407+
}
408+
MimeType::APPLICATION_CBOR => {
409+
let t = unmarshal(misc::cbor(), &raw.as_ref())?;
410+
Ok(Some(t))
411+
}
412+
_ => Err("unsupported mime type!".into()),
413+
}
414+
}
415+
337416
fn do_marshal<T>(mime_type: &MimeType, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
338417
where
339418
T: Sized + Serialize,

rsocket-test/tests/test_messaging.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Token {
1919
access: String,
2020
}
2121

22-
#[derive(Serialize, Deserialize, Debug, Default)]
22+
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
2323
pub struct Student {
2424
id: i64,
2525
name: String,
@@ -56,21 +56,45 @@ async fn test_messaging() {
5656
.await
5757
.expect("Connect failed!");
5858

59-
let post = Student {
60-
id: 1234,
61-
name: "Jeffsky".to_owned(),
62-
birth: "2020-01-01".to_owned(),
63-
};
64-
59+
// TEST MONO BEGIN
6560
let res: Response<Student> = requester
6661
.route("student.v1.upsert")
6762
.metadata(Tracing::default(), "application/json")
6863
.metadata_raw("foobar", "message/x.rsocket.authentication.bearer.v0")
69-
.data(post)
64+
.data(next_post())
7065
.retrieve_mono()
7166
.await
7267
.block()
7368
.expect("Retrieve failed!")
7469
.expect("Empty result!");
7570
info!("------> RESPONSE: {:?}", res);
71+
72+
// TEST FLUX BEGIN
73+
let res: Vec<Student> = requester
74+
.route("students.v1")
75+
.data(next_post())
76+
.retrieve_flux()
77+
.block()
78+
.await
79+
.expect("Retrieve failed!");
80+
for it in res.iter() {
81+
info!("===> NEXT: {:?}", it);
82+
}
83+
requester
84+
.route("students.v1")
85+
.data(next_post())
86+
.retrieve_flux()
87+
.foreach(|it: Student| {
88+
info!("===> FOREACH: {:?}", it);
89+
})
90+
.await
91+
.expect("Retrieve failed!");
92+
}
93+
94+
fn next_post() -> Student {
95+
Student {
96+
id: 1234,
97+
name: "Jeffsky".to_owned(),
98+
birth: "2020-01-01".to_owned(),
99+
}
76100
}

0 commit comments

Comments
 (0)