@@ -32,7 +32,7 @@ use tokio::{
3232 mpsc:: { self , unbounded_channel, UnboundedReceiver , UnboundedSender } ,
3333 oneshot,
3434 } ,
35- task:: { self , JoinError , JoinSet } ,
35+ task:: { self , JoinError , JoinHandle , JoinSet } ,
3636 time:: { self , sleep, MissedTickBehavior } ,
3737} ;
3838use tokio_stream:: wrappers:: UnboundedReceiverStream ;
@@ -61,14 +61,16 @@ pub async fn start_consumer(
6161 . expect ( "Can't subscribe to specified topics" ) ;
6262
6363 handle_os_signals ( event_sender. clone ( ) ) ;
64- poll_consumer_client ( consumer. clone ( ) , client_shutdown_receiver) ;
64+ let rdkafka_driver = poll_consumer_client ( consumer. clone ( ) , client_shutdown_receiver) ;
6565 handle_events (
6666 consumer,
6767 event_receiver,
6868 client_shutdown_sender,
6969 spawn_actors,
7070 )
71- . await
71+ . await ?;
72+ rdkafka_driver. await ?;
73+ Ok ( ( ) )
7274}
7375
7476pub fn handle_os_signals ( event_sender : UnboundedSender < ( Event , SyncSender < ( ) > ) > ) {
@@ -85,23 +87,28 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
8587pub fn poll_consumer_client (
8688 consumer : Arc < StreamConsumer < KafkaContext > > ,
8789 shutdown : oneshot:: Receiver < ( ) > ,
88- ) {
90+ ) -> JoinHandle < ( ) > {
8991 task:: spawn_blocking ( || {
9092 Handle :: current ( ) . block_on ( async move {
9193 let _guard = elegant_departure:: get_shutdown_guard ( ) . shutdown_on_drop ( ) ;
9294 select ! {
9395 biased;
94- _ = shutdown => {
95- debug!( "Received shutdown signal, commiting state in sync mode..." ) ;
96- let _ = consumer. commit_consumer_state( rdkafka:: consumer:: CommitMode :: Sync ) ;
97- }
96+ _ = shutdown => { }
9897 msg = consumer. recv( ) => {
9998 error!( "Got unexpected message from consumer client: {:?}" , msg) ;
10099 }
101- }
100+
101+ } ;
102+
103+ select ! {
104+ biased;
105+ _ = consumer. recv( ) => { }
106+ _ = sleep( Duration :: from_secs( 8 ) ) => { }
107+ } ;
108+
102109 debug ! ( "Shutdown complete" ) ;
103110 } ) ;
104- } ) ;
111+ } )
105112}
106113
107114#[ derive( Debug ) ]
@@ -118,8 +125,20 @@ impl KafkaContext {
118125impl ClientContext for KafkaContext { }
119126
120127impl ConsumerContext for KafkaContext {
121- #[ instrument( skip_all) ]
122- fn pre_rebalance ( & self , _: & BaseConsumer < Self > , rebalance : & Rebalance ) {
128+ #[ instrument( skip( self , base_consumer) ) ]
129+ fn pre_rebalance ( & self , base_consumer : & BaseConsumer < Self > , rebalance : & Rebalance ) {
130+ if let Rebalance :: Assign ( tpl) = rebalance {
131+ if tpl. count ( ) == 0 {
132+ return ;
133+ }
134+ }
135+ base_consumer
136+ . pause (
137+ & base_consumer
138+ . assignment ( )
139+ . expect ( "Unable to fetch assigned TPL" ) ,
140+ )
141+ . expect ( "Unable to pause consumer" ) ;
123142 let ( rendezvous_sender, rendezvous_receiver) = sync_channel ( 0 ) ;
124143 match rebalance {
125144 Rebalance :: Assign ( tpl) => {
@@ -149,6 +168,31 @@ impl ConsumerContext for KafkaContext {
149168 }
150169 }
151170
171+ #[ instrument( skip( self , base_consumer) ) ]
172+ fn post_rebalance ( & self , base_consumer : & BaseConsumer < Self > , rebalance : & Rebalance ) {
173+ if let Rebalance :: Assign ( tpl) = rebalance {
174+ if tpl. count ( ) == 0 {
175+ return ;
176+ }
177+ }
178+ let assignment = base_consumer
179+ . assignment ( )
180+ . expect ( "Failed to get assigned TPL" ) ;
181+ if assignment. count ( ) != 0 {
182+ base_consumer
183+ . seek_partitions (
184+ base_consumer
185+ . committed ( rdkafka:: util:: Timeout :: Never )
186+ . expect ( "Failed to get commited TPL" ) ,
187+ rdkafka:: util:: Timeout :: Never ,
188+ )
189+ . expect ( "Failed to seek to commited offset" ) ;
190+ base_consumer
191+ . resume ( & assignment)
192+ . expect ( "Failed to resume consumer" ) ;
193+ }
194+ }
195+
152196 #[ instrument( skip( self ) ) ]
153197 fn commit_callback ( & self , result : KafkaResult < ( ) > , _offsets : & TopicPartitionList ) {
154198 debug ! ( "Got commit callback" ) ;
@@ -336,7 +380,7 @@ pub async fn handle_events(
336380
337381 let mut state = ConsumerState :: Ready ;
338382
339- while let ConsumerState :: Ready { .. } | ConsumerState :: Consuming { .. } = state {
383+ while let ConsumerState :: Ready | ConsumerState :: Consuming { .. } = state {
340384 select ! {
341385 res = match state {
342386 ConsumerState :: Consuming ( ref mut handles, _) => Either :: Left ( handles. join_next( ) ) ,
@@ -352,8 +396,8 @@ pub async fn handle_events(
352396 } ;
353397 info!( "Received event: {:?}" , event) ;
354398 state = match ( state, event) {
355- ( ConsumerState :: Ready , Event :: Assign ( assigned ) ) => {
356- ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & assigned ) , assigned )
399+ ( ConsumerState :: Ready , Event :: Assign ( tpl ) ) => {
400+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl ) , tpl )
357401 }
358402 ( ConsumerState :: Ready , Event :: Revoke ( _) ) => {
359403 unreachable!( "Got partition revocation before the consumer has started" )
@@ -364,17 +408,17 @@ pub async fn handle_events(
364408 tpl. is_disjoint( & assigned) ,
365409 "Newly assigned TPL should be disjoint from TPL we're consuming from"
366410 ) ;
367- tpl. append( & mut assigned) ;
368411 debug!(
369412 "{} additional topic partitions added after assignment" ,
370413 assigned. len( )
371414 ) ;
415+ tpl. append( & mut assigned) ;
372416 handles. shutdown( CALLBACK_DURATION ) . await ;
373417 ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
374418 }
375419 ( ConsumerState :: Consuming ( handles, mut tpl) , Event :: Revoke ( revoked) ) => {
376420 assert!(
377- tpl . is_subset( & revoked ) ,
421+ revoked . is_subset( & tpl ) ,
378422 "Revoked TPL should be a subset of TPL we're consuming from"
379423 ) ;
380424 tpl. retain( |e| !revoked. contains( e) ) ;
@@ -750,7 +794,7 @@ impl CommitClient for StreamConsumer<KafkaContext> {
750794 }
751795}
752796
753- #[ derive( Default ) ]
797+ #[ derive( Default , Debug ) ]
754798struct HighwaterMark {
755799 data : HashMap < ( String , i32 ) , i64 > ,
756800}
@@ -795,6 +839,7 @@ pub async fn commit(
795839 while let Some ( msgs) = receiver. recv ( ) . await {
796840 let mut highwater_mark = HighwaterMark :: new ( ) ;
797841 msgs. 0 . iter ( ) . for_each ( |msg| highwater_mark. track ( msg) ) ;
842+ debug ! ( "Store: {:?}" , highwater_mark) ;
798843 consumer. store_offsets ( & highwater_mark. into ( ) ) . unwrap ( ) ;
799844 }
800845 debug ! ( "Shutdown complete" ) ;
0 commit comments