@@ -197,21 +197,27 @@ where
197197 }
198198 }
199199
200- // TODO: support fragmentation
201200 async fn join_frame ( & self , input : Frame ) -> Option < Frame > {
202- if !input. is_followable ( ) {
201+ let ( is_follow, is_payload) = input. is_followable_or_payload ( ) ;
202+ if !is_follow {
203203 return Some ( input) ;
204204 }
205205 let sid = input. get_stream_id ( ) ;
206206 let mut joiners = self . joiners . lock ( ) . await ;
207207
208208 if input. get_flag ( ) & frame:: FLAG_FOLLOW != 0 {
209+ // TODO: check conflict
209210 ( * joiners)
210211 . entry ( sid)
211212 . or_insert_with ( Joiner :: new)
212213 . push ( input) ;
213214 return None ;
214215 }
216+
217+ if !is_payload {
218+ return Some ( input) ;
219+ }
220+
215221 match ( * joiners) . remove ( & sid) {
216222 None => Some ( input) ,
217223 Some ( mut joiner) => {
@@ -266,6 +272,10 @@ where
266272
267273 #[ inline]
268274 async fn on_error ( & self , sid : u32 , flag : u16 , input : frame:: Error ) {
275+ {
276+ let mut joiners = self . joiners . lock ( ) . await ;
277+ ( * joiners) . remove ( & sid) ;
278+ }
269279 // pick handler
270280 let mut handlers = self . handlers . lock ( ) . await ;
271281 if let Some ( handler) = ( * handlers) . remove ( & sid) {
@@ -282,6 +292,10 @@ where
282292
283293 #[ inline]
284294 async fn on_cancel ( & self , sid : u32 , _flag : u16 ) {
295+ {
296+ let mut joiners = self . joiners . lock ( ) . await ;
297+ ( * joiners) . remove ( & sid) ;
298+ }
285299 let mut handlers = self . handlers . lock ( ) . await ;
286300 if let Some ( handler) = ( * handlers) . remove ( & sid) {
287301 let e = Err ( RSocketError :: from ( ErrorKind :: Cancelled ( ) ) ) ;
@@ -343,32 +357,6 @@ where
343357 }
344358 Entry :: Vacant ( v) => warn ! ( "invalid payload id {}: no such request!" , sid) ,
345359 }
346-
347- // match (*handlers).remove(&sid).unwrap() {
348- // Handler::ReqRR(sender) => sender.send(Ok(input)).unwrap(),
349- // Handler::ResRR(c) => unreachable!(),
350- // Handler::ReqRS(sender) => {
351- // if flag & frame::FLAG_NEXT != 0 {
352- // sender
353- // .unbounded_send(Ok(input))
354- // .expect("Send payload response failed.");
355- // }
356- // if flag & frame::FLAG_COMPLETE == 0 {
357- // (*handlers).insert(sid, Handler::ReqRS(sender));
358- // }
359- // }
360- // Handler::ReqRC(sender) => {
361- // // TODO: support channel
362- // if flag & frame::FLAG_NEXT != 0 {
363- // sender
364- // .unbounded_send(Ok(input))
365- // .expect("Send payload response failed");
366- // }
367- // if flag & frame::FLAG_COMPLETE == 0 {
368- // (*handlers).insert(sid, Handler::ReqRC(sender));
369- // }
370- // }
371- // };
372360 }
373361
374362 #[ inline]
0 commit comments