diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index c918319f9e908..babda0031fd7a 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -53,6 +53,7 @@ direct_consumer::update_fetchers(topic_partition_map removals) { co_return; } auto holder = _gate.hold(); + auto subscription_lock_holder = co_await _subscriptions_lock.get_units(); /** * Unassign partitions from fetchers that are no longer needed. */ @@ -149,7 +150,12 @@ ss::future<> direct_consumer::start() { ss::future<> direct_consumer::stop() { _cluster->unregister_metadata_cb(_metadata_callback_id); _fetched_data_queue->stop(); - co_await _gate.close(); + + // close the gate and break the lock together, no need to drain all + // operations + auto f = _gate.close(); + _subscriptions_lock.broken(); + co_await std::move(f); co_await ss::parallel_for_each( _broker_fetchers, [](auto& pair) { return pair.second->stop(); }); diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index 84ffea7328ee7..20a17e0b1d099 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -13,6 +13,7 @@ #include "container/chunked_hash_map.h" #include "kafka/client/cluster.h" #include "kafka/client/direct_consumer/api_types.h" +#include "utils/mutex.h" namespace kafka { struct metadata_response_data; @@ -143,6 +144,9 @@ class direct_consumer { configuration _config; topic_partition_map _subscriptions; + // serialize updates to _subscriptions + mutex _subscriptions_lock{"direct_consumer::_subscriptions_lock"}; + chunked_hash_map> _broker_fetchers; std::unique_ptr _fetched_data_queue; ss::condition_variable _data_available; diff --git a/src/v/kafka/client/direct_consumer/fetcher.cc b/src/v/kafka/client/direct_consumer/fetcher.cc index 641c783655e50..c358a3167849c 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.cc +++ b/src/v/kafka/client/direct_consumer/fetcher.cc @@ -130,17 +130,43 @@ bool fetcher::maybe_update_fetch_offset( model::partition_id partition_id, kafka::offset last_received, assignment_epoch assignment_epoch) { + // lets see if this is running + vlog( + logger().info, + "[broker: {}] {}/{} maybe_update_fetch_offset assignment_epoch: {}", + _id, + topic, + partition_id, + assignment_epoch); + auto t_it = _partitions.find(topic); + // topic not found if (t_it == _partitions.end()) { + vlog( + logger().info, + "[broker: {}] {}/{} topic not found", + _id, + topic, + partition_id); return false; } + auto p_it = t_it->second.find(partition_id); + // partition not found if (p_it == t_it->second.end()) { + vlog( + logger().info, + "[broker: {}] {}/{} partition not found", + _id, + topic, + partition_id); return false; } + + // assignment mismatch if (p_it->second.assignment_epoch != assignment_epoch) { vlog( - logger().trace, + logger().info, "[broker: {}] Ignoring {}/{} reply, assignment epoch changed, " "request epoch: {}, current epoch: {}", _id, @@ -151,8 +177,9 @@ bool fetcher::maybe_update_fetch_offset( return false; } + // success vlog( - logger().trace, + logger().info, "[broker: {}] Updating {}/{} fetch offset to {}", _id, topic, @@ -345,12 +372,18 @@ fetcher::process_fetch_response( part_data.data = co_await reader_to_chunked_vector( std::move(part_response.records.value())); - maybe_update_fetch_offset( + const bool was_updated = maybe_update_fetch_offset( topic_data.topic, part_data.partition_id, model::offset_cast(part_data.data.back().last_offset()), find_assignment_epoch( topic_data.topic, part_data.partition_id, epochs)); + + // whether by epoch change or missing partition, do not put this + // data on the queue + if (!was_updated) { + continue; + } } topic_data.partitions.push_back(std::move(part_data)); } @@ -472,6 +505,7 @@ ss::future fetcher::maybe_initialise_fetch_offsets( partition_offset.offset, request_epoch, p_it->second.assignment_epoch); + // TODO continue?; } vlog( logger().info, @@ -541,14 +575,17 @@ fetcher::unassign_partition(model::topic_partition_view tp_v) { // partition not found, nothing to unassign co_return std::nullopt; } + auto fetch_offset = p_it->second.fetch_offset; + vlog( logger().debug, - "[broker: {}] Removing partition: {} assignment", + "[broker: {}] Removing partition: {} assignment with offset: {}", _id, - tp_v); + tp_v, + fetch_offset); + // TODO: for fetch sessions we will need to mark partitions to deletion so // they can be removed from the fetch session - auto fetch_offset = p_it->second.fetch_offset; partitions.erase(p_it); if (partitions.empty()) { // if there are no partitions left for this topic, remove the topic diff --git a/src/v/kafka/client/direct_consumer/verifier/application.cc b/src/v/kafka/client/direct_consumer/verifier/application.cc index 01c24cbcafe71..dc0c9742886f8 100644 --- a/src/v/kafka/client/direct_consumer/verifier/application.cc +++ b/src/v/kafka/client/direct_consumer/verifier/application.cc @@ -114,6 +114,14 @@ ss::future<> consumer_runner::do_fetch() { _total_records += totals.count; auto last_fetched_offset = model::offset_cast( fetched_partition_data.data.back().last_offset()); + + vlog( + v_logger.trace, + "fetched ntp: {}/{}, offset: {}", + fetched_topic_data.topic, + fetched_partition_data.partition_id, + last_fetched_offset); + if (last_fetched_offset <= partition_stats.last_fetched_offset) { _non_monotonic_fetches++; vlog( diff --git a/tests/rptest/direct_consumer_tests/direct_consumer_test.py b/tests/rptest/direct_consumer_tests/direct_consumer_test.py index 92c75adca5c26..43dfa3ed0254a 100644 --- a/tests/rptest/direct_consumer_tests/direct_consumer_test.py +++ b/tests/rptest/direct_consumer_tests/direct_consumer_test.py @@ -9,30 +9,80 @@ from typing import Any +import time +import random +import threading +from typing import Callable + from rptest.services.cluster import cluster +from rptest.services.admin import Admin from ducktape.utils.util import wait_until from ducktape.tests.test import TestContext +from ducktape.errors import TimeoutError from rptest.services.direct_consumer_verifier import ( DirectConsumerVerifier, CreateDirectConsumerRequest, BrokerAddress, DirectConsumerConfiguration, OffsetResetPolicy, IsolationLevel, AssignPartitionsRequest, TopicAssignment, PartitionAssignment, GetConsumerStateRequest) +from rptest.utils.node_operations import FailureInjectorBackgroundThread from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.clients.types import TopicSpec from rptest.tests.redpanda_test import RedpandaTest +class LoopThread(threading.Thread): + """Run loop func until graceful cancellation is requested""" + def _loop(self, *args, **kwargs): + while not self.stopped(): + self._loop_func(*args, **kwargs) + + def __init__(self, *args, loop_func: Callable[[], None], **kwargs): + super(LoopThread, self).__init__(*args, target=self._loop, **kwargs) + self._stop_event = threading.Event() + self._loop_func = loop_func + + def stop(self): + self._stop_event.set() + + def stopped(self): + return self._stop_event.is_set() + + #TODO: This test must be enabled once the direct consumer verifier support # is added to vtools. class DirectConsumerVerifierTest(RedpandaTest): def __init__(self, test_context: TestContext, **kwargs: Any): super().__init__(test_context, **kwargs) - @cluster(num_nodes=4) + def shuffle_one_leader(self, topic_spec: TopicSpec) -> None: + '''randomly chooses a partition in the given topic spec and moves its leader one replica over''' + namespace = "kafka" + topic_name = topic_spec.name + partition_number = random.choice(range(topic_spec.partition_count)) + admin = Admin(self.redpanda) + # logs to debug + transfer_succeeded = admin.transfer_leadership_to( + namespace="kafka", + topic=topic_spec.name, + partition=random.randrange(0, topic_spec.partition_count)) + self.redpanda.logger.debug( + f"transfer of ntp {namespace}/{topic_name}/{partition_number} success? {transfer_succeeded}" + ) + + def create_troublemaker_thread(self, + topic_spec: TopicSpec, + thread_name="stream_thread") -> LoopThread: + '''creates a background thread to drive paritition leadership transfers''' + thread = LoopThread(name=thread_name, + loop_func=self.shuffle_one_leader, + args=(topic_spec, )) + return thread + + @cluster(num_nodes=5) def test_basic_consuming_from_topic(self): topic_name = "test-topic" - msg_count = 200000 + msg_count = 2000000 msg_size = 128 client_id = "test-consumer" @@ -42,15 +92,22 @@ def test_basic_consuming_from_topic(self): self.client().create_topic(topic_spec) - KgoVerifierProducer.oneshot(self.test_context, - self.redpanda, - topic_name, - msg_size=msg_size, - msg_count=msg_count) + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + topic_name, + msg_size=msg_size, + msg_count=msg_count) + producer.start() verifier = DirectConsumerVerifier(self.test_context, log_level="DEBUG") verifier.start() + troublemaker = self.create_troublemaker_thread(topic_spec=topic_spec) + troublemaker.start() + + #failure_injector = FailureInjectorBackgroundThread(self.redpanda, self.logger, max_inter_failure_time=10) + #failure_injector.start() + try: # check if the verifier is alive verifier.status() @@ -92,22 +149,31 @@ def test_basic_consuming_from_topic(self): state_request = GetConsumerStateRequest( client_id=client_id, include_partition_states=True) - def check_consumption(): + def check_consumption() -> bool: state = verifier.get_consumer_state(state_request) - self.logger.info( - f"Consumer state: consumed {state.total_consumed_messages} messages" + self.logger.debug( + f"Expecting: {msg_count} Consumer state: consumed {state.total_consumed_messages} messages" ) return state.total_consumed_messages >= msg_count - wait_until(check_consumption, timeout_sec=60, backoff_sec=2) + wait_until(check_consumption, timeout_sec=600, backoff_sec=2) final_state = verifier.get_consumer_state(state_request) + + # assertions assert final_state.total_consumed_messages == msg_count, \ f"Expected {msg_count} messages, got {final_state.total_consumed_messages}" + assert int(final_state.non_monotonic_fetches) == 0, \ + f"Non-monotomic fetches found, number of nm fetches: {final_state.non_monotonic_fetches}" + self.logger.info( f"Successfully consumed {final_state.total_consumed_messages} messages" ) finally: + #failure_injector.stop() + troublemaker.stop() + producer.stop() verifier.stop() + producer.wait(timeout_sec=600)