Skip to content

Commit 5e4b5b2

Browse files
committed
feat(request_response): handle empty response correctly
1 parent 29754e0 commit 5e4b5b2

File tree

6 files changed

+38
-19
lines changed

6 files changed

+38
-19
lines changed

rsocket-messaging/src/requester.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use url::Url;
1515
type FnMetadata = Box<dyn FnMut() -> Result<(MimeType, Vec<u8>)>>;
1616
type FnData = Box<dyn FnMut(&MimeType) -> Result<Vec<u8>>>;
1717
type PreflightResult = Result<(Payload, MimeType, Arc<Box<dyn RSocket>>)>;
18-
type UnpackerResult = Result<(MimeType, Payload)>;
18+
type UnpackerResult = Result<(MimeType, Option<Payload>)>;
1919
type UnpackersResult = Result<(MimeType, Flux<Result<Payload>>)>;
2020

2121
enum TransportKind {
@@ -352,8 +352,11 @@ impl Unpacker {
352352
T: Sized + DeserializeOwned,
353353
{
354354
let (mime_type, inner) = self.inner?;
355-
match inner.data() {
356-
Some(raw) => do_unmarshal(&mime_type, raw),
355+
match inner {
356+
Some(it) => match it.data() {
357+
Some(raw) => do_unmarshal(&mime_type, raw),
358+
None => Ok(None),
359+
},
357360
None => Ok(None),
358361
}
359362
}

rsocket-transport-wasm/src/misc.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ impl JsClient {
7373
let request: JsPayload = request.into_serde().unwrap();
7474
future_to_promise(async move {
7575
match inner.request_response(request.into()).await {
76-
Ok(v) => {
76+
Ok(Some(v)) => {
7777
let jp = JsPayload::from(v);
7878
Ok((&jp).into())
7979
}
80+
Ok(None) => Ok(JsValue::UNDEFINED),
8081
Err(e) => Err(JsValue::from(&format!("{:?}", e))),
8182
}
8283
})

rsocket/src/core/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl RSocket for Client {
217217
self.socket.fire_and_forget(req).await
218218
}
219219

220-
async fn request_response(&self, req: Payload) -> Result<Payload> {
220+
async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
221221
self.socket.request_response(req).await
222222
}
223223

rsocket/src/spi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub type Flux<T> = Pin<Box<dyn Send + Stream<Item = T>>>;
1818
pub trait RSocket: Sync + Send {
1919
async fn metadata_push(&self, req: Payload) -> Result<()>;
2020
async fn fire_and_forget(&self, req: Payload) -> Result<()>;
21-
async fn request_response(&self, req: Payload) -> Result<Payload>;
21+
async fn request_response(&self, req: Payload) -> Result<Option<Payload>>;
2222
fn request_stream(&self, req: Payload) -> Flux<Result<Payload>>;
2323
fn request_channel(&self, reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>>;
2424
}

rsocket/src/transport/socket.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct Responder {
3636

3737
#[derive(Debug)]
3838
enum Handler {
39-
ReqRR(oneshot::Sender<Result<Payload>>),
39+
ReqRR(oneshot::Sender<Result<Option<Payload>>>),
4040
ResRR(Counter),
4141
ReqRS(mpsc::Sender<Result<Payload>>),
4242
ReqRC(mpsc::Sender<Result<Payload>>),
@@ -261,12 +261,12 @@ impl DuplexSocket {
261261
// pick handler
262262
if let Some((_, handler)) = self.handlers.remove(&sid) {
263263
let desc = input.get_data_utf8().unwrap().to_owned();
264-
let e: Result<_> = Err(RSocketError::must_new_from_code(input.get_code(), desc).into());
264+
let e = RSocketError::must_new_from_code(input.get_code(), desc);
265265
match handler {
266-
Handler::ReqRR(tx) => tx.send(e).expect("Send RR failed"),
266+
Handler::ReqRR(tx) => tx.send(Err(e.into())).expect("Send RR failed"),
267267
Handler::ResRR(_) => unreachable!(),
268-
Handler::ReqRS(tx) => tx.send(e).await.expect("Send RS failed"),
269-
Handler::ReqRC(tx) => tx.send(e).await.expect("Send RC failed"),
268+
Handler::ReqRS(tx) => tx.send(Err(e.into())).await.expect("Send RS failed"),
269+
Handler::ReqRC(tx) => tx.send(Err(e.into())).await.expect("Send RC failed"),
270270
}
271271
}
272272
}
@@ -303,7 +303,11 @@ impl DuplexSocket {
303303
match o.get() {
304304
Handler::ReqRR(_) => match o.remove() {
305305
Handler::ReqRR(sender) => {
306-
sender.send(Ok(input)).unwrap();
306+
if flag & Frame::FLAG_NEXT != 0 {
307+
sender.send(Ok(Some(input))).unwrap();
308+
} else {
309+
sender.send(Ok(None)).unwrap();
310+
}
307311
}
308312
_ => unreachable!(),
309313
},
@@ -394,7 +398,7 @@ impl DuplexSocket {
394398
canceller.send(sid).await.expect("Send canceller failed");
395399

396400
match result {
397-
Ok(res) => {
401+
Ok(Some(res)) => {
398402
Self::try_send_payload(
399403
&splitter,
400404
&mut tx,
@@ -404,6 +408,9 @@ impl DuplexSocket {
404408
)
405409
.await;
406410
}
411+
Ok(None) => {
412+
Self::try_send_complete(&mut tx, sid, Frame::FLAG_COMPLETE).await;
413+
}
407414
Err(e) => {
408415
let sending = frame::Error::builder(sid, 0)
409416
.set_code(error::ERR_APPLICATION)
@@ -570,6 +577,14 @@ impl DuplexSocket {
570577
}
571578
}
572579

580+
#[inline]
581+
async fn try_send_complete(tx: &mut mpsc::Sender<Frame>, sid: u32, flag: u16) {
582+
let sending = frame::Payload::builder(sid, flag).build();
583+
if let Err(e) = tx.send(sending).await {
584+
error!("respond failed: {}", e);
585+
}
586+
}
587+
573588
#[inline]
574589
async fn try_send_payload(
575590
splitter: &Option<Splitter>,
@@ -697,8 +712,8 @@ impl RSocket for DuplexSocket {
697712
Ok(())
698713
}
699714

700-
async fn request_response(&self, req: Payload) -> Result<Payload> {
701-
let (tx, rx) = oneshot::channel::<Result<Payload>>();
715+
async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
716+
let (tx, rx) = oneshot::channel::<Result<Option<Payload>>>();
702717
let sid = self.seq.next();
703718
let handlers = self.handlers.clone();
704719
let sender = self.tx.clone();
@@ -911,7 +926,7 @@ impl RSocket for Responder {
911926
(*inner).fire_and_forget(req).await
912927
}
913928

914-
async fn request_response(&self, req: Payload) -> Result<Payload> {
929+
async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
915930
let inner = self.inner.read().await;
916931
(*inner).request_response(req).await
917932
}

rsocket/src/utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ impl RSocket for EchoRSocket {
2626
Ok(())
2727
}
2828

29-
async fn request_response(&self, req: Payload) -> Result<Payload> {
29+
async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
3030
info!("{:?}", req);
31-
Ok(req)
31+
Ok(Some(req))
3232
}
3333

3434
fn request_stream(&self, req: Payload) -> Flux<Result<Payload>> {
@@ -67,7 +67,7 @@ impl RSocket for EmptyRSocket {
6767
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
6868
}
6969

70-
async fn request_response(&self, _req: Payload) -> Result<Payload> {
70+
async fn request_response(&self, _req: Payload) -> Result<Option<Payload>> {
7171
Err(RSocketError::ApplicationException("UNIMPLEMENT".into()).into())
7272
}
7373

0 commit comments

Comments
 (0)