@@ -15,7 +15,6 @@ use futures::{Sink, SinkExt, Stream, StreamExt};
1515use std:: future:: Future ;
1616use std:: pin:: Pin ;
1717use std:: sync:: Arc ;
18- use tokio:: prelude:: * ;
1918use tokio:: sync:: { mpsc, oneshot, RwLock } ;
2019
2120#[ derive( Clone ) ]
@@ -95,7 +94,7 @@ impl DuplexSocket {
9594
9695 #[ inline]
9796 async fn loop_canceller ( & self , mut rx : mpsc:: Receiver < u32 > ) {
98- while let Some ( sid) = rx. next ( ) . await {
97+ while let Some ( sid) = rx. recv ( ) . await {
9998 self . handlers . remove ( & sid) ;
10099 }
101100 }
@@ -456,12 +455,16 @@ impl DuplexSocket {
456455 async fn on_request_channel ( & self , sid : u32 , flag : u16 , first : Payload ) {
457456 let responder = self . responder . clone ( ) ;
458457 let tx = self . tx . clone ( ) ;
459- let ( sender, receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
458+ let ( sender, mut receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
460459 sender. send ( Ok ( first) ) . await . expect ( "Send failed!" ) ;
461460 self . register_handler ( sid, Handler :: ReqRC ( sender) ) . await ;
462461 runtime:: spawn ( async move {
463462 // respond client channel
464- let mut outputs = responder. request_channel ( Box :: pin ( receiver) ) ;
463+ let mut outputs = responder. request_channel ( Box :: pin ( stream ! {
464+ while let Some ( it) = receiver. recv( ) . await {
465+ yield it;
466+ }
467+ } ) ) ;
465468 // TODO: support custom RequestN.
466469 let request_n = frame:: RequestN :: builder ( sid, 0 ) . build ( ) ;
467470
@@ -784,7 +787,7 @@ impl RSocket for DuplexSocket {
784787 let sid = self . seq . next ( ) ;
785788 let tx = self . tx . clone ( ) ;
786789 // register handler
787- let ( sender, receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
790+ let ( sender, mut receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
788791 let handlers = self . handlers . clone ( ) ;
789792 let splitter = self . splitter . clone ( ) ;
790793 runtime:: spawn ( async move {
@@ -843,14 +846,18 @@ impl RSocket for DuplexSocket {
843846 }
844847 }
845848 } ) ;
846- Box :: pin ( receiver)
849+ Box :: pin ( stream ! {
850+ while let Some ( it) = receiver. recv( ) . await {
851+ yield it;
852+ }
853+ } )
847854 }
848855
849856 fn request_channel ( & self , mut reqs : Flux < Result < Payload > > ) -> Flux < Result < Payload > > {
850857 let sid = self . seq . next ( ) ;
851858 let mut tx = self . tx . clone ( ) ;
852859 // register handler
853- let ( sender, receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
860+ let ( sender, mut receiver) = mpsc:: channel :: < Result < Payload > > ( 32 ) ;
854861 let handlers = self . handlers . clone ( ) ;
855862 let splitter = self . splitter . clone ( ) ;
856863 runtime:: spawn ( async move {
@@ -884,7 +891,11 @@ impl RSocket for DuplexSocket {
884891 error ! ( "complete REQUEST_CHANNEL failed: {}" , e) ;
885892 }
886893 } ) ;
887- Box :: pin ( receiver)
894+ Box :: pin ( stream ! {
895+ while let Some ( it) = receiver. recv( ) . await {
896+ yield it;
897+ }
898+ } )
888899 }
889900}
890901
0 commit comments