@@ -359,32 +359,15 @@ pub async fn handle_events(
359359 unreachable!( "Got partition revocation before the consumer has started" )
360360 }
361361 ( ConsumerState :: Ready , Event :: Shutdown ) => ConsumerState :: Stopped ,
362- ( ConsumerState :: Consuming ( handles, mut tpl) , Event :: Assign ( mut assigned) ) => {
363- assert!(
364- tpl. is_disjoint( & assigned) ,
365- "Newly assigned TPL should be disjoint from TPL we're consuming from"
366- ) ;
367- tpl. append( & mut assigned) ;
368- debug!(
369- "{} additional topic partitions added after assignment" ,
370- assigned. len( )
371- ) ;
372- handles. shutdown( CALLBACK_DURATION ) . await ;
373- ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
362+ ( ConsumerState :: Consuming ( _, _) , Event :: Assign ( _) ) => {
363+ unreachable!( "Got partition assignment after the consumer has started" )
374364 }
375- ( ConsumerState :: Consuming ( handles , mut tpl) , Event :: Revoke ( revoked) ) => {
365+ ( ConsumerState :: Consuming ( _ , tpl) , Event :: Revoke ( revoked) ) => {
376366 assert!(
377- tpl. is_subset ( & revoked) ,
378- "Revoked TPL should be a subset of TPL we're consuming from"
367+ tpl == revoked,
368+ "Revoked TPL should be equal to the subset of TPL we're consuming from"
379369 ) ;
380- tpl. retain( |e| !revoked. contains( e) ) ;
381- debug!( "{} topic partitions remaining after revocation" , tpl. len( ) ) ;
382- handles. shutdown( CALLBACK_DURATION ) . await ;
383- if tpl. is_empty( ) {
384- ConsumerState :: Ready
385- } else {
386- ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
387- }
370+ ConsumerState :: Ready
388371 }
389372 ( ConsumerState :: Consuming ( handles, _) , Event :: Shutdown ) => {
390373 handles. shutdown( CALLBACK_DURATION ) . await ;
0 commit comments