diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 0a7a95cb..49c9d0db 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -45,7 +45,7 @@ def manage_consumer( ) process.send_signal(signal.SIGINT) try: - return_code = process.wait(timeout=10) + return_code = process.wait(timeout=30) assert return_code == 0 except Exception: process.kill() @@ -210,8 +210,13 @@ def test_tasks_written_once_during_rebalancing() -> None: for log_line_index, line in enumerate(lines): if "[31mERROR" in line: # If there is an error in log file, capture 10 lines before and after the error line - consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:") - for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)): + consumer_error_logs.append( + f"Error found in consumer_{i}. Logging 10 lines before and after the error line:" + ) + for j in range( + max(0, log_line_index - 10), + min(len(lines) - 1, log_line_index + 10), + ): consumer_error_logs.append(lines[j].strip()) consumer_error_logs.append("") diff --git a/src/config.rs b/src/config.rs index 0cbf7eac..0b81f354 100644 --- a/src/config.rs +++ b/src/config.rs @@ -140,6 +140,7 @@ impl Config { "session.timeout.ms", self.kafka_session_timeout_ms.to_string(), ) + .set("partition.assignment.strategy", "cooperative-sticky") .set("enable.partition.eof", "false") .set("enable.auto.commit", "true") .set( diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index 5f7a310b..40a33fde 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -32,7 +32,7 @@ use tokio::{ mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }, - task::{self, JoinError, JoinSet}, + task::{self, JoinError, JoinHandle, JoinSet}, time::{self, sleep, MissedTickBehavior}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -61,14 +61,16 @@ pub async fn start_consumer( .expect("Can't subscribe to specified topics"); handle_os_signals(event_sender.clone()); - poll_consumer_client(consumer.clone(), client_shutdown_receiver); + let rdkafka_driver = poll_consumer_client(consumer.clone(), client_shutdown_receiver); handle_events( consumer, event_receiver, client_shutdown_sender, spawn_actors, ) - .await + .await?; + rdkafka_driver.await?; + Ok(()) } pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) { @@ -85,23 +87,28 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) pub fn poll_consumer_client( consumer: Arc>, shutdown: oneshot::Receiver<()>, -) { +) -> JoinHandle<()> { task::spawn_blocking(|| { Handle::current().block_on(async move { let _guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); select! { biased; - _ = shutdown => { - debug!("Received shutdown signal, commiting state in sync mode..."); - let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync); - } + _ = shutdown => {} msg = consumer.recv() => { error!("Got unexpected message from consumer client: {:?}", msg); } - } + + }; + + select! { + biased; + _ = consumer.recv() => {} + _ = sleep(Duration::from_secs(30)) => {} + }; + debug!("Shutdown complete"); }); - }); + }) } #[derive(Debug)] @@ -118,8 +125,20 @@ impl KafkaContext { impl ClientContext for KafkaContext {} impl ConsumerContext for KafkaContext { - #[instrument(skip_all)] - fn pre_rebalance(&self, _: &BaseConsumer, rebalance: &Rebalance) { + #[instrument(skip(self, base_consumer))] + fn pre_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance) { + if let Rebalance::Assign(tpl) = rebalance { + if tpl.count() == 0 { + return; + } + } + base_consumer + .pause( + &base_consumer + .assignment() + .expect("Unable to fetch assigned TPL"), + ) + .expect("Unable to pause consumer"); let (rendezvous_sender, rendezvous_receiver) = sync_channel(0); match rebalance { Rebalance::Assign(tpl) => { @@ -149,6 +168,31 @@ impl ConsumerContext for KafkaContext { } } + #[instrument(skip(self, base_consumer))] + fn post_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance) { + if let Rebalance::Assign(tpl) = rebalance { + if tpl.count() == 0 { + return; + } + } + let assignment = base_consumer + .assignment() + .expect("Failed to get assigned TPL"); + if assignment.count() != 0 { + base_consumer + .seek_partitions( + base_consumer + .committed(rdkafka::util::Timeout::Never) + .expect("Failed to get commited TPL"), + rdkafka::util::Timeout::Never, + ) + .expect("Failed to seek to commited offset"); + base_consumer + .resume(&assignment) + .expect("Failed to resume consumer"); + } + } + #[instrument(skip(self))] fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { debug!("Got commit callback"); @@ -336,7 +380,7 @@ pub async fn handle_events( let mut state = ConsumerState::Ready; - while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state { + while let ConsumerState::Ready | ConsumerState::Consuming { .. } = state { select! { res = match state { ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()), @@ -352,20 +396,30 @@ pub async fn handle_events( }; info!("Received event: {:?}", event); state = match (state, event) { - (ConsumerState::Ready, Event::Assign(assigned)) => { - ConsumerState::Consuming(spawn_actors(consumer.clone(), &assigned), assigned) + (ConsumerState::Ready, Event::Assign(tpl)) => { + ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { unreachable!("Got partition revocation before the consumer has started") } (ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped, - (ConsumerState::Consuming(_, _), Event::Assign(_)) => { - unreachable!("Got partition assignment after the consumer has started") + (ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => { + assert!( + tpl.is_disjoint(&assigned), + "Newly assigned TPL should be disjoint from TPL we're consuming from" + ); + debug!( + "{} additional topic partitions added after assignment", + assigned.len() + ); + tpl.append(&mut assigned); + handles.shutdown(CALLBACK_DURATION).await; + ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Consuming(handles, tpl), Event::Revoke(revoked)) => { assert!( - tpl == revoked, - "Revoked TPL should be equal to the subset of TPL we're consuming from" + revoked.is_subset(&tpl), + "Revoked TPL should be a subset of TPL we're consuming from" ); handles.shutdown(CALLBACK_DURATION).await; ConsumerState::Ready @@ -734,7 +788,7 @@ impl CommitClient for StreamConsumer { } } -#[derive(Default)] +#[derive(Default, Debug)] struct HighwaterMark { data: HashMap<(String, i32), i64>, } @@ -779,6 +833,7 @@ pub async fn commit( while let Some(msgs) = receiver.recv().await { let mut highwater_mark = HighwaterMark::new(); msgs.0.iter().for_each(|msg| highwater_mark.track(msg)); + debug!("Store: {:?}", highwater_mark); consumer.store_offsets(&highwater_mark.into()).unwrap(); } debug!("Shutdown complete");