diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index 1861f450b75eb..b36428cef8504 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -56,6 +56,7 @@ direct_consumer::update_fetchers(topic_partition_map removals) { co_return; } auto holder = _gate.hold(); + auto subscription_lock_holder = co_await _subscriptions_lock.get_units(); /** * Unassign partitions from fetchers that are no longer needed. */ @@ -153,7 +154,12 @@ ss::future<> direct_consumer::start() { ss::future<> direct_consumer::stop() { _cluster->unregister_metadata_cb(_metadata_callback_id); _fetched_data_queue->stop(); - co_await _gate.close(); + + // close the gate and break the lock together, no need to drain all + // operations + auto f = _gate.close(); + _subscriptions_lock.broken(); + co_await std::move(f); co_await ss::parallel_for_each( _broker_fetchers, [](auto& pair) { return pair.second->stop(); }); diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index efc0ec7151318..b136cf62ceffb 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -144,6 +144,8 @@ class direct_consumer { configuration _config; + // serialize updates to _subscriptions + mutex _subscriptions_lock{"direct_consumer::_subscriptions_lock"}; topic_partition_map _subscriptions; chunked_hash_map> _broker_fetchers; std::unique_ptr _fetched_data_queue; diff --git a/src/v/kafka/client/direct_consumer/fetcher.cc b/src/v/kafka/client/direct_consumer/fetcher.cc index cb762bbbbf39f..4c90f6c5bced6 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.cc +++ b/src/v/kafka/client/direct_consumer/fetcher.cc @@ -529,6 +529,9 @@ fetcher::process_fetch_response( part_data.high_watermark, find_assignment_epoch( topic_data.topic, part_data.partition_id, epochs)); + // if (!updated_offset) { + // continue; + // } if (updated_offset) { dirty_partitions[topic_data.topic].insert( part_data.partition_id); diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc index bc66d91eced91..97af0041d1497 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc @@ -106,6 +106,8 @@ class consumer_fixture : public cluster_test_fixture { model::ntp ntp( model::kafka_namespace, topic, model::partition_id(partition)); + logger.info("producing to ntp: {}", ntp); + auto leader_id = get_partition_leader(ntp); vlog( @@ -126,6 +128,35 @@ class consumer_fixture : public cluster_test_fixture { .get(); } + // intentional shadowing + ss::future<> shuffle_leadership(model::ntp ntp) { + logger.info("starting the shuffle on ntp: {}", ntp); + vassert( + !instance_ids().empty(), + "broker instances is not allowed to be empty"); + logger.info("ids validated"); + + // auto leader_broker = get_partition_leader(ntp); + auto [rp_ptr, partition_ptr] = get_leader(ntp); + logger.info("got leader pointers"); + + vassert( + partition_ptr->raft(), + "must have a valid consensus to transfer leadership"); + + logger.info("raft instances are good"); + auto transfer_ec = partition_ptr + ->transfer_leadership( + {.group = partition_ptr->raft()->group()}) + .get(); + + logger.info("transfer completed with ec: {}", transfer_ec); + vassert( + !transfer_ec, "leadership transfer failed with ec: {}", transfer_ec); + + return ss::now(); + } + chunked_hash_map< model::topic_partition, chunked_vector> @@ -283,584 +314,83 @@ TEST_P(basic_consume_fixture, TestBasicConsumption) { model::offset(1019)); } -TEST_P(basic_consume_fixture, TestUnassignPartition) { - assign_partitions(make_assignment(topic, {0, 1})); - - constexpr size_t n = 100; - - // produce some data - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), 2); - ASSERT_EQ( - fetched.find(model::topic_partition{topic, model::partition_id{2}}), - fetched.end()); - for (auto id : std::array{0, 1}) { - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(id))] - .back() - .last_offset(), - model::offset(n - 1)); - } - } - - unassign_partition(model::topic_partition{topic, model::partition_id{0}}); - - // enable fetch sessions if the test is in toggle mode - maybe_toggle_fetch_sessions(); - - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - - ASSERT_EQ(fetched.size(), 1); - for (auto p : std::array{0, 2}) { - ASSERT_EQ( - fetched.find( - model::topic_partition{topic, model::partition_id{p}}), - fetched.end()); - } - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(1))] - .back() - .last_offset(), - model::offset(n * 2 - 1)); - } - - unassign_partition(model::topic_partition{topic, model::partition_id{1}}); - assign_partitions(make_assignment(topic, {0})); - - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - - ASSERT_EQ(fetched.size(), 1); - for (auto p : std::array{1, 2}) { - ASSERT_EQ( - fetched.find( - model::topic_partition{topic, model::partition_id{p}}), - fetched.end()); - } - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(0))] - .back() - .last_offset(), - model::offset(n * 3 - 1)); - } -} - -TEST_P(basic_consume_fixture, TestUnassignTopic) { +TEST_P(basic_consume_fixture, TestBasicLeadershipTransfer) { assign_partitions(make_assignment(topic, {0, 1, 2})); - constexpr size_t n = 100; - - // produce some data - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), 3); - for (auto id : std::array{0, 1, 2}) { - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(id))] - .back() - .last_offset(), - model::offset(n - 1)); - } + // no data should be available immediately, as the topic is empty + for (int i = 0; i < 10; ++i) { + auto fetched = consumer->fetch_next(100ms).get(); + ASSERT_TRUE(fetched.value().empty()); } + // produce some data + produce_to_partition(topic, 0, 1000); + produce_to_partition(topic, 1, 400); + produce_to_partition(topic, 2, 20); - unassign_topic(topic); - - maybe_toggle_fetch_sessions(); - - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } + logger.info("about to fetch"); - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), 0); - } -} + auto fetched = fetch_until_empty(*consumer); -TEST_P(basic_consume_fixture, TestBogusPartitionIds) { - // test that providing non-existent or ill formed partition IDs to the - // consumer doesn't cause issues. we wouldn't expect this to happen in - // practice. - assign_partitions(make_assignment(topic, {0, 2, 5, 23, -1})); + logger.info("fetched"); - constexpr int n = 100; + ASSERT_EQ(fetched.size(), 3); + ASSERT_EQ( + fetched[model::topic_partition(topic, model::partition_id(0))] + .back() + .last_offset(), + model::offset(999)); + ASSERT_EQ( + fetched[model::topic_partition(topic, model::partition_id(1))] + .back() + .last_offset(), + model::offset(399)); + ASSERT_EQ( + fetched[model::topic_partition(topic, model::partition_id(2))] + .back() + .last_offset(), + model::offset(19)); - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } + logger.info("getting ready to shuffle"); - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), 2); - for (auto id : std::array{0, 2}) { - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(id))] - .back() - .last_offset(), - model::offset(n - 1)); - } - } + model::ntp to_shuffle{ + model::kafka_namespace, topic, model::partition_id{0}}; - maybe_toggle_fetch_sessions(); + logger.info("shuffling this one {}", to_shuffle); - unassign_partition(model::topic_partition{topic, model::partition_id{5}}); - unassign_partition(model::topic_partition{topic, model::partition_id{42}}); - unassign_topic(model::topic{"noexist"}); + this->shuffle_leadership(to_shuffle).get(); - for (auto p : std::array{0, 1, 2}) { - produce_to_partition(topic, p, n); - } + logger.info("after shuffle"); - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), 2); - for (auto id : std::array{0, 2}) { - ASSERT_EQ( - fetched[model::topic_partition(topic, model::partition_id(id))] - .back() - .last_offset(), - model::offset(2 * n - 1)); - } - } + // produce again + // produce_to_partition(topic, 2, 1000); + // produce_to_partition(topic, 1, 400); + produce_to_partition(topic, 0, 20); + auto fetched_2 = fetch_until_empty(*consumer); + ASSERT_EQ(fetched_2.size(), 3); + ASSERT_EQ( + fetched_2[model::topic_partition(topic, model::partition_id(0))] + .back() + .last_offset(), + model::offset(1019)); + // ASSERT_EQ( + // fetched_2[model::topic_partition(topic, model::partition_id(1))] + // .back() + // .last_offset(), + // model::offset(799)); + // ASSERT_EQ( + // fetched_2[model::topic_partition(topic, model::partition_id(2))] + // .back() + // .last_offset(), + // model::offset(1019)); } INSTANTIATE_TEST_SUITE_P( test_with_basic_consume_fixture, basic_consume_fixture, testing::Values( - session_config::with_sessions, - session_config::without_sessions, - session_config::toggle_sessions)); - -namespace { - -constexpr model::offset forget_partition_placeholder = model::offset{-1}; -constexpr int full_fetch_sid = kafka::fetch_session_id{0}; - -struct fetch_capture { - using requests - = std::unordered_map>; - std::map sessions; - requests fetch_offsets; - int total_requests{0}; - int empty_requests{0}; - fmt::iterator format_to(fmt::iterator it) const { - fmt::format_to(it, "\ttotal requests: {}\n", total_requests); - fmt::format_to(it, "\tempty requests: {}\n", empty_requests); - for (const auto& [sid, reqs] : sessions) { - if (sid == full_fetch_sid) { - fmt::format_to(it, "\tfull fetch: \n"); - } else { - fmt::format_to(it, "\tsession {}: \n", sid); - } - for (const auto& [tp, fos] : reqs) { - fmt::format_to( - it, - "\t\t{} ({}): [{}]\n", - tp, - fos.size(), - fmt::join( - fos | std::views::transform([](const model::offset o) { - if (o == forget_partition_placeholder) { - return std::string{"F"}; - } else { - return fmt::format("{}", o); - } - }), - ", ")); - } - } - - return it; - } -}; - -struct cluster_capture { - std::unordered_map captured; - fmt::iterator format_to(fmt::iterator it) const { - it = fmt::format_to(it, "\n"); - for (const auto& [nid, fc] : captured) { - // we don't care about the seed broker - if (nid == unknown_node_id) { - continue; - } - it = fmt::format_to(it, "node {}: {{\n{}}}\n", nid, fc); - } - return it; - } -}; - -} // namespace - -class request_capturing_remote_broker : public broker { -public: - request_capturing_remote_broker(shared_broker_t remote, fetch_capture& cap) - : _fetch_capture(&cap) - , _broker(std::move(remote)) {} - - model::node_id id() const final { return _broker->id(); } - - ss::future<> stop() final { return _broker->stop(); } - - ss::future> get_supported_versions( - kafka::api_key key, - std::optional> as - = std::nullopt) final { - return _broker->get_supported_versions(key, as); - } - - const net::unresolved_address& get_address() const final { - return _broker->get_address(); - } - - ss::future dispatch( - request_t req, - kafka::api_version version, - std::optional> as - = std::nullopt) final { - ss::visit( - req, - [](const auto&) {}, - [this](const kafka::fetch_request& f) { - auto partitions_v - = f.data.topics - | std::views::transform([](const kafka::fetch_topic& t) { - return t.partitions - | std::views::transform( - [](const kafka::fetch_partition& p) { - return fmt::format( - "{{{}, {}}}", - p.partition, - p.fetch_offset); - }); - }) - | std::views::join; - - vlog( - logger.debug, - "[broker: {}] FETCH (session id: {} epoch: {}): [{}]", - id(), - f.data.session_id, - f.data.session_epoch, - fmt::join(partitions_v, ", ")); - - // ignore full fetch requests that initialize a session. the - // contents are less predictable and we only really care about - // semantics w/in and across open sessions - if ( - f.is_full_fetch_request() - && f.data.session_epoch == kafka::initial_fetch_session_epoch) { - return; - } - - _fetch_capture->total_requests++; - if ( - f.data.topics.empty() && f.data.forgotten_topics_data.empty()) { - _fetch_capture->empty_requests++; - } - - kafka::fetch_session_id session_id{f.data.session_id}; - - for (const auto& t : f.data.topics) { - for (const auto& p : t.partitions) { - model::topic_partition tp{t.topic, p.partition}; - _fetch_capture->sessions[session_id][tp].push_back( - p.fetch_offset); - _fetch_capture->fetch_offsets[tp].push_back( - p.fetch_offset); - } - } - for (const auto& t : f.data.forgotten_topics_data) { - for (const auto& p : t.partitions) { - model::topic_partition tp{ - t.topic, model::partition_id{p}}; - _fetch_capture->sessions[session_id][tp].push_back( - forget_partition_placeholder); - _fetch_capture->fetch_offsets[tp].push_back( - forget_partition_placeholder); - } - } - }); - return _broker->dispatch(std::move(req), version, as); - } - -private: - fetch_capture* _fetch_capture; - shared_broker_t _broker; -}; - -/** - * Simple class used to create broker objects. Created broker objects use - * configuration provided when creating the factory. - */ -struct request_capturing_broker_factory : public broker_factory { - explicit request_capturing_broker_factory( - connection_configuration config, cluster_capture& cap) - : _capture(&cap) - , _config(std::move(config)) - , _logger(logger, _config.client_id.value_or("kafka-client")) - , _factory(std::make_unique(_config, _logger)) {} - - ss::future - create_broker(model::node_id id, net::unresolved_address addr) final { - auto remote = co_await _factory->create_broker(id, addr); - auto& cap = _capture->captured[id]; - co_return ss::make_shared( - std::move(remote), cap); - } - -private: - cluster_capture* _capture; - connection_configuration _config; - prefix_logger _logger; - std::unique_ptr _factory; -}; - -class fetch_session_fixture - : public consumer_fixture - , public testing::Test { -public: - void wait_for_leadership() { - for (auto i : std::views::iota(0, n_partitions)) { - get_partition_leader(model::ntp{ - model::kafka_namespace, topic, model::partition_id{i}}); - } - } - - void SetUp() override { - create_node_application(model::node_id{0}); - create_node_application(model::node_id{1}); - create_node_application(model::node_id{2}); - auto* rp = instance(model::node_id{0}); - wait_for_all_members(3s).get(); - rp->add_topic( - {model::kafka_namespace, topic}, n_partitions, std::nullopt, 3) - .get(); - - // the pattern of fetch requests is more predictable if we wait for - // leadership before firing up the consumer - wait_for_leadership(); - - cluster = create_client_cluster( - std::make_unique( - make_connection_config(), _capture)) - .get(); - consumer = std::make_unique( - *cluster, - direct_consumer::configuration{ - .with_sessions = fetch_sessions_enabled::yes}); - consumer->start().get(); - } - - void TearDown() override { - consumer->stop().get(); - cluster->stop().get(); - } - - void validate_sessions(std::function&)> validator) { - for (const auto& [node, cap] : _capture.captured) { - for (const auto& [sid, reqs] : cap.sessions) { - ASSERT_NE(sid, kafka::invalid_fetch_session_id); - for (const auto& [tp, fos] : reqs) { - validator(tp, fos); - } - } - } - } - - cluster_capture _capture{}; - constexpr static int n_partitions{10}; -}; - -TEST_F(fetch_session_fixture, TestFetchRequestContents) { - // This test - // - Assigns some partitions the consumer - // - Produce to all partitions and fetch until empty - // - Check captured fetch requests for each assigned partition with the - // expectation that it's included on when the fetch offset changes. - // - Produce to a subset of assigned partition and perform a similar - // check. Only those with new data should appear in subsequent fetches. - - auto all_partitions = std::views::iota(0, n_partitions); - constexpr int p_assign = 7; - auto initial_assignment = std::views::iota(0, p_assign); - constexpr int p_second_produce = 4; - auto second_produce = std::views::iota(0, p_second_produce); - - assign_partitions(make_assignment( - topic, initial_assignment | std::ranges::to>())); - - for (int i = 0; i < 10; ++i) { - auto fetched = consumer->fetch_next(100ms).get(); - ASSERT_TRUE(fetched.value().empty()); - } - - constexpr int64_t n = 100; - - for (auto i : all_partitions) { - produce_to_partition(topic, i, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), initial_assignment.size()); - } - - std::unordered_map sessions_per_partition; - - validate_sessions([&]( - const model::topic_partition& tp, - const std::vector& offsets) { - sessions_per_partition[tp.partition]++; - ASSERT_LT(tp.partition(), p_assign) - << fmt::format("Unexpected request for {}", tp); - ASSERT_EQ( - offsets, - std::vector({model::offset{0}, model::offset{n}})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - }); - - for (auto i : all_partitions) { - auto it = sessions_per_partition.find(model::partition_id{i}); - if (i < p_assign) { - ASSERT_NE(it, sessions_per_partition.end()); - ASSERT_EQ(it->second, 1); - } else { - ASSERT_EQ(it, sessions_per_partition.end()); - } - } - - for (auto i : second_produce) { - produce_to_partition(topic, i, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), second_produce.size()); - } - - validate_sessions([&]( - const model::topic_partition& tp, - const std::vector& offsets) { - ASSERT_LT(tp.partition(), p_assign) - << fmt::format("Unexpected request for {}", tp); - - if (tp.partition() < p_second_produce) { - ASSERT_EQ( - offsets, - std::vector( - {model::offset{0}, model::offset{n}, model::offset{n * 2}})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - } else { - ASSERT_EQ( - offsets, - std::vector({model::offset{0}, model::offset{n}})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - } - }); - - vlog(logger.debug, "CAPTURE: {}", _capture); -} - -TEST_F(fetch_session_fixture, TestFetchRequestUnassignContents) { - // similar to the previous test, but this time forget partitions some - // partitions and verify that this is reflected in the subsequent - // incremental fetch request - auto all_partitions = std::views::iota(0, n_partitions); - constexpr int p_assign = 7; - auto initial_assignment = std::views::iota(0, p_assign); - constexpr int p_first_unassign = 4; - auto to_unassign = std::views::iota(p_first_unassign, p_assign); - - wait_for_leadership(); - - assign_partitions(make_assignment( - topic, initial_assignment | std::ranges::to>())); - - for (int i = 0; i < 10; ++i) { - auto fetched = consumer->fetch_next(100ms).get(); - ASSERT_TRUE(fetched.value().empty()); - } - - constexpr int64_t n = 100; - - for (auto i : all_partitions) { - produce_to_partition(topic, i, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), initial_assignment.size()); - } - - validate_sessions([&]( - const model::topic_partition& tp, - const std::vector& offsets) { - ASSERT_LT(tp.partition(), p_assign) - << fmt::format("Unexpected request for {}", tp); - ASSERT_EQ( - offsets, - std::vector({model::offset{0}, model::offset{n}})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - }); - - for (auto i : to_unassign) { - unassign_partition( - model::topic_partition{topic, model::partition_id{i}}); - } - - for (auto i : all_partitions) { - produce_to_partition(topic, i, n); - } - - { - auto fetched = fetch_until_empty(*consumer); - ASSERT_EQ(fetched.size(), p_first_unassign); - } - - validate_sessions([&]( - const model::topic_partition& tp, - const std::vector& offsets) { - ASSERT_LT(tp.partition(), p_assign) - << fmt::format("Unexpected request for {}", tp); - if (tp.partition() < p_first_unassign) { - ASSERT_EQ( - offsets, - std::vector( - {model::offset{0}, model::offset{n}, model::offset{n * 2}})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - } else { - ASSERT_EQ( - offsets, - std::vector( - {model::offset{0}, - model::offset{n}, - forget_partition_placeholder})) - << fmt::format("Unexpected offsets for {}: {}", tp, _capture); - } - }); - - vlog(logger.debug, "CAPTURE: {}", _capture); -} + // session_config::with_sessions, + session_config::without_sessions //, + // session_config::toggle_sessions + )); // TODO: inject errors? diff --git a/src/v/kafka/client/direct_consumer/verifier/application.cc b/src/v/kafka/client/direct_consumer/verifier/application.cc index 01c24cbcafe71..77d433560ce43 100644 --- a/src/v/kafka/client/direct_consumer/verifier/application.cc +++ b/src/v/kafka/client/direct_consumer/verifier/application.cc @@ -93,6 +93,13 @@ ss::future<> consumer_runner::do_fetch() { if (fetched_partition_data.data.empty()) { continue; // no data to process } + + vlog( + v_logger.info, + "fetched ntp: {}/{}, no offset yet", + fetched_topic_data.topic, + fetched_partition_data.partition_id); + auto& partition_stats = _stats[fetched_topic_data.topic] [fetched_partition_data.partition_id]; if (fetched_partition_data.error != kafka::error_code::none) { @@ -114,6 +121,14 @@ ss::future<> consumer_runner::do_fetch() { _total_records += totals.count; auto last_fetched_offset = model::offset_cast( fetched_partition_data.data.back().last_offset()); + + vlog( + v_logger.info, + "fetched ntp: {}/{}, offset: {}", + fetched_topic_data.topic, + fetched_partition_data.partition_id, + last_fetched_offset); + if (last_fetched_offset <= partition_stats.last_fetched_offset) { _non_monotonic_fetches++; vlog( diff --git a/tests/rptest/direct_consumer_tests/direct_consumer_test.py b/tests/rptest/direct_consumer_tests/direct_consumer_test.py index 666fe094563aa..a3470a76fb5f6 100644 --- a/tests/rptest/direct_consumer_tests/direct_consumer_test.py +++ b/tests/rptest/direct_consumer_tests/direct_consumer_test.py @@ -9,9 +9,16 @@ from typing import Any +import time +import random +import threading +from typing import Callable + from rptest.services.cluster import cluster +from rptest.services.admin import Admin from ducktape.utils.util import wait_until from ducktape.tests.test import TestContext +from ducktape.errors import TimeoutError from rptest.services.direct_consumer_verifier import ( DirectConsumerVerifier, CreateDirectConsumerRequest, BrokerAddress, @@ -24,15 +31,58 @@ from rptest.util import wait_until_with_progress_check +class LoopThread(threading.Thread): + """Run loop func until graceful cancellation is requested""" + def _loop(self, *args, **kwargs): + while not self.stopped(): + self._loop_func(*args, **kwargs) + + def __init__(self, *args, loop_func: Callable[[], None], **kwargs): + super(LoopThread, self).__init__(*args, target=self._loop, **kwargs) + self._stop_event = threading.Event() + self._loop_func = loop_func + + def stop(self): + self._stop_event.set() + + def stopped(self): + return self._stop_event.is_set() + + #TODO: This test must be enabled once the direct consumer verifier support # is added to vtools. class DirectConsumerVerifierTest(RedpandaTest): def __init__(self, test_context: TestContext, **kwargs: Any): super().__init__(test_context, **kwargs) - @cluster(num_nodes=4) + def shuffle_one_leader(self, topic_spec: TopicSpec) -> None: + '''randomly chooses a partition in the given topic spec and moves its leader one replica over''' + namespace = "kafka" + topic_name = topic_spec.name + partition_number = random.choice(range(topic_spec.partition_count)) + admin = Admin(self.redpanda) + # logs to debug + transfer_succeeded = admin.transfer_leadership_to( + namespace="kafka", + topic=topic_spec.name, + partition=random.randrange(0, topic_spec.partition_count)) + self.redpanda.logger.debug( + f"transfer of ntp {namespace}/{topic_name}/{partition_number} success? {transfer_succeeded}" + ) + + def create_troublemaker_thread(self, + topic_spec: TopicSpec, + thread_name="stream_thread") -> LoopThread: + '''creates a background thread to drive paritition leadership transfers''' + thread = LoopThread(name=thread_name, + loop_func=self.shuffle_one_leader, + args=(topic_spec, )) + return thread + + @cluster(num_nodes=5) def test_basic_consuming_from_topic(self): topic_name = "test-topic" + #msg_count = 2000000 msg_count = 200000 msg_size = 128 client_id = "test-consumer" @@ -43,15 +93,19 @@ def test_basic_consuming_from_topic(self): self.client().create_topic(topic_spec) - KgoVerifierProducer.oneshot(self.test_context, - self.redpanda, - topic_name, - msg_size=msg_size, - msg_count=msg_count) + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + topic_name, + msg_size=msg_size, + msg_count=msg_count) + producer.start() verifier = DirectConsumerVerifier(self.test_context, log_level="DEBUG") verifier.start() + troublemaker = self.create_troublemaker_thread(topic_spec=topic_spec) + troublemaker.start() + try: # check if the verifier is alive verifier.status() @@ -95,8 +149,8 @@ def test_basic_consuming_from_topic(self): def get_consumption(): state = verifier.get_consumer_state(state_request) - self.logger.info( - f"Consumer state: consumed {state.total_consumed_messages} messages" + self.logger.debug( + f"Expecting: {msg_count} Consumer state: consumed {state.total_consumed_messages} messages" ) return state.total_consumed_messages @@ -111,12 +165,20 @@ def get_consumption(): ) final_state = verifier.get_consumer_state(state_request) + + # assertions assert final_state.total_consumed_messages == msg_count, \ f"Expected {msg_count} messages, got {final_state.total_consumed_messages}" + assert int(final_state.non_monotonic_fetches) == 0, \ + f"Non-monotomic fetches found, number of nm fetches: {final_state.non_monotonic_fetches}" + self.logger.info( f"Successfully consumed {final_state.total_consumed_messages} messages" ) finally: + troublemaker.stop() + producer.stop() verifier.stop() + producer.wait(timeout_sec=600)