From 5f9066375dc5fd3c6fae5d9d6dabde1e4842a65e Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Fri, 31 Oct 2025 14:52:04 -0700 Subject: [PATCH 1/4] direct_consumer: move offset update logic to fetch A fetch is potentially stale until the point in time at which it is returned to the consumer in fetch_next. This pr moves the update for offsets to only after the final subscription epoch filter has been applied to guarantee correctness of the provided offsets. --- .../client/direct_consumer/direct_consumer.cc | 115 +++++++++++++----- .../client/direct_consumer/direct_consumer.h | 14 ++- src/v/kafka/client/direct_consumer/fetcher.cc | 89 ++++++++------ 3 files changed, 150 insertions(+), 68 deletions(-) diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index a370893e65cc5..a1f2b3cdbe17d 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -17,6 +17,7 @@ #include "ssx/future-util.h" #include +#include namespace kafka::client { @@ -51,6 +52,7 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) { // the remainder is synchronous, no lock is needed auto& response_to_filter = maybe_response_to_filter.value(); filter_stale_subscriptions(response_to_filter); + update_start_offsets(response_to_filter); co_return maybe_response_to_filter; } catch (ss::condition_variable_timed_out&) { @@ -59,7 +61,7 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) { } void direct_consumer::filter_stale_subscriptions( - chunked_vector& responses_to_filter) { + chunked_vector& responses_to_filter) const { // for each topic, remove stale partitions for (auto& topic_data : responses_to_filter) { auto& partition_data = topic_data.partitions; @@ -103,6 +105,58 @@ void direct_consumer::filter_stale_subscriptions( responses_to_filter.end() - non_empty_subsegment.size()); } +void direct_consumer::update_start_offsets( + const chunked_vector& fetched_data) { + for (const auto& topic_data : fetched_data) { + for (const auto& partition_data : topic_data.partitions) { + // we have already filtered the map, we know these will be found + auto maybe_subscription = find_subscription( + topic_data.topic, partition_data.partition_id); + vassert( + maybe_subscription.has_value(), + "invoke only after filtering, all subscriptions should be " + "consistent"); + auto& subscription = maybe_subscription->get(); + source_partition_offsets spo{ + .log_start_offset = partition_data.start_offset, + .high_watermark = partition_data.high_watermark, + .last_stable_offset = partition_data.last_stable_offset, + .last_offset_update_timestamp = ss::lowres_clock::now()}; + if ( + subscription.last_known_source_offsets.log_start_offset + > spo.log_start_offset) { + vlog( + _cluster->logger().error, + "log start offset should never move backward, current: {}, " + "found: {}", + subscription.last_known_source_offsets.log_start_offset, + spo.log_start_offset); + } + if ( + subscription.last_known_source_offsets.high_watermark + > spo.high_watermark) { + vlog( + _cluster->logger().warn, + "high watermark should not normally move backward, current: " + "{}, found: {}", + subscription.last_known_source_offsets.high_watermark, + spo.high_watermark); + } + if ( + subscription.last_known_source_offsets.last_stable_offset + > spo.last_stable_offset) { + vlog( + _cluster->logger().error, + "last known stable should never move backward, " + "current: {}, found: {}", + subscription.last_known_source_offsets.last_stable_offset, + spo.last_stable_offset); + } + subscription.last_known_source_offsets = spo; + } + } +} + void direct_consumer::update_configuration(configuration cfg) { vlog( _cluster->logger().info, @@ -118,15 +172,10 @@ void direct_consumer::update_configuration(configuration cfg) { std::optional direct_consumer::get_source_offsets(model::topic_partition_view tp) const { - auto it = _subscriptions.find(tp.topic); - if (it == _subscriptions.end()) { - return std::nullopt; - } - auto p_it = it->second.find(tp.partition); - if (p_it == it->second.end()) { - return std::nullopt; - } - return p_it->second.last_known_source_offsets; + return find_subscription(tp.topic, tp.partition) + .transform([](std::reference_wrapper sub) { + return sub.get().last_known_source_offsets; + }); } ss::future<> direct_consumer::update_fetchers( @@ -231,7 +280,24 @@ fetcher& direct_consumer::get_fetcher(model::node_id id) { return *it->second; } -std::optional direct_consumer::find_subscription_epoch( +std::optional> +direct_consumer::find_subscription( + const model::topic& topic, model::partition_id partition_id) const { + auto t_it = _subscriptions.find(topic); + if (t_it == _subscriptions.end()) { + return std::nullopt; + } + + auto& p_map = t_it->second; + auto p_it = p_map.find(partition_id); + if (p_it == p_map.end()) { + return std::nullopt; + } + return p_it->second; +} + +std::optional> +direct_consumer::find_subscription( const model::topic& topic, model::partition_id partition_id) { auto t_it = _subscriptions.find(topic); if (t_it == _subscriptions.end()) { @@ -243,7 +309,14 @@ std::optional direct_consumer::find_subscription_epoch( if (p_it == p_map.end()) { return std::nullopt; } - return p_it->second.subscription_epoch; + return p_it->second; +} + +std::optional direct_consumer::find_subscription_epoch( + const model::topic& topic, model::partition_id partition_id) const { + return find_subscription(topic, partition_id).transform([](auto sub) { + return sub.get().subscription_epoch; + }); } ss::future<> direct_consumer::start() { @@ -350,24 +423,6 @@ void direct_consumer::on_metadata_update(const metadata_update&) { ssx::spawn_with_gate(_gate, [this] { return handle_metadata_update(); }); } -void direct_consumer::maybe_update_source_partition_offsets( - model::topic_partition_view tp, source_partition_offsets offsets) { - auto it = _subscriptions.find(tp.topic); - if (it == _subscriptions.end()) { - return; - } - auto p_it = it->second.find(tp.partition); - if (p_it == it->second.end()) { - return; - } - auto& sub = p_it->second; - if ( - offsets.last_offset_update_timestamp - > sub.last_known_source_offsets.last_offset_update_timestamp) { - sub.last_known_source_offsets = offsets; - } -} - direct_consumer::~direct_consumer() = default; std::ostream& diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index cfce483d13a43..8ad87700c187e 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -170,14 +170,20 @@ class direct_consumer { fetcher& get_fetcher(model::node_id id); - std::optional find_subscription_epoch( + std::optional> find_subscription( + const model::topic& topic, model::partition_id partition_id) const; + + std::optional> find_subscription( const model::topic& topic, model::partition_id partition_id); + std::optional find_subscription_epoch( + const model::topic& topic, model::partition_id partition_id) const; + void filter_stale_subscriptions( - chunked_vector& responses_to_filter); + chunked_vector& responses_to_filter) const; - void maybe_update_source_partition_offsets( - model::topic_partition_view tp, source_partition_offsets offsets); + void update_start_offsets( + const chunked_vector& fetched_data); cluster* _cluster; diff --git a/src/v/kafka/client/direct_consumer/fetcher.cc b/src/v/kafka/client/direct_consumer/fetcher.cc index 91e5935f80d4e..a80bb93502836 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.cc +++ b/src/v/kafka/client/direct_consumer/fetcher.cc @@ -550,27 +550,17 @@ fetcher::process_fetch_response( dirty_partitions[topic_data.topic].insert( part_data.partition_id); } else { - source_partition_offsets offsets{ - .log_start_offset = model::offset_cast( - part_response.log_start_offset), - .high_watermark = model::offset_cast( - part_response.high_watermark), - .last_stable_offset = model::offset_cast( - part_response.last_stable_offset), - .last_offset_update_timestamp = ss::lowres_clock::now(), - }; - part_data.start_offset = offsets.log_start_offset; - part_data.high_watermark = offsets.high_watermark; - part_data.last_stable_offset = offsets.last_stable_offset; + part_data.start_offset = model::offset_cast( + part_response.log_start_offset); + part_data.high_watermark = model::offset_cast( + part_response.high_watermark); + part_data.last_stable_offset = model::offset_cast( + part_response.last_stable_offset); part_data.leader_epoch = part_response.current_leader.leader_epoch; part_data.aborted_transactions = std::move( part_response.aborted_transactions); - _parent->maybe_update_source_partition_offsets( - {topic_data.topic, part_response.partition_index}, - std::move(offsets)); - vlog( logger().trace, "[broker: {}] topic: {}, partition fetch response: {}", @@ -578,28 +568,59 @@ fetcher::process_fetch_response( topic_data.topic, part_response); + // if no records, just emplace the offset updates + // if records, handle size calculation and fetch offsets updates if ( !part_response.records.has_value() || part_response.records->is_end_of_stream()) { - continue; - } - auto partition_response_size - = part_response.records->size_bytes(); - part_data.size_bytes = partition_response_size; - topic_data.total_bytes += partition_response_size; - part_data.data = co_await reader_to_chunked_vector( - std::move(part_response.records.value())); - - bool updated_offset = maybe_update_fetch_offset( - topic_data.topic, - part_data.partition_id, - model::offset_cast(part_data.data.back().last_offset()), - part_data.high_watermark); - if (!updated_offset) { - continue; + // these still go on the queue as they probably contain + // information about a prefix truncation + vlog( + logger().info, + "[broker: {}] tp: {}/{}, received recordless response", + _id, + topic_data.topic, + part_data.partition_id); + } else { + // from here, there is actual data to be process, render it + // accordingly + auto partition_response_size + = part_response.records->size_bytes(); + part_data.size_bytes = partition_response_size; + topic_data.total_bytes += partition_response_size; + part_data.data = co_await reader_to_chunked_vector( + std::move(part_response.records.value())); + + bool updated_offset = maybe_update_fetch_offset( + topic_data.topic, + part_data.partition_id, + model::offset_cast(part_data.data.back().last_offset()), + part_data.high_watermark); + if (!updated_offset) { + // This implies a mistake in the fetch logic. A response + // that is + // 1. consistent + // 2. record bearing + // 3. redundant + // should not occur and can be considered a + // non-monatomic fetch + vlog( + logger().error, + "[broker: {}] tp: {}/{} received a record bearing " + "fetch " + "that did not update fetch offsets", + _id, + topic_data.topic, + part_data.partition_id); + // record will still go on the queue, but with records + // emptied in case it contains a start offset update + topic_data.total_bytes -= partition_response_size; + part_data.size_bytes = 0u; + part_data.data.clear(); + } + dirty_partitions[topic_data.topic].insert( + part_data.partition_id); } - dirty_partitions[topic_data.topic].insert( - part_data.partition_id); } topic_data.partitions.push_back(std::move(part_data)); } From 6580f280e02b42440ea42cf9bb2b576052d3fe61 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Tue, 4 Nov 2025 09:56:24 -0800 Subject: [PATCH 2/4] cache --- .../kafka/client/direct_consumer/tests/BUILD | 3 + .../tests/direct_consumer_fixture.cc | 8 ++- .../tests/direct_consumer_fixture_test.cc | 68 +++++++++++-------- 3 files changed, 49 insertions(+), 30 deletions(-) diff --git a/src/v/kafka/client/direct_consumer/tests/BUILD b/src/v/kafka/client/direct_consumer/tests/BUILD index 19330d3281477..25bf33f4d1559 100644 --- a/src/v/kafka/client/direct_consumer/tests/BUILD +++ b/src/v/kafka/client/direct_consumer/tests/BUILD @@ -69,6 +69,9 @@ redpanda_cc_gtest( "direct_consumer_fixture_test.cc", ], cpu = 1, + args = [ + "--logger-log-level=kvstore=info:cluster=info:storage-resources=info:storage=info:syschecks=info:r/heartbeat=info:rpc=info:raft=info" + ], deps = [ ":direct_consumer_fixture", "//src/v/model", diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc index d0d9c67d04323..45a7d0fb6afd0 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc @@ -291,8 +291,14 @@ consumer_fixture::fetch_until_empty(direct_consumer& consumer) { chunked_vector> ret; + auto timeout = 1s; + auto deadline = ss::lowres_clock::now() + timeout; + while (true) { - auto fetched = consumer.fetch_next(1000ms).get(); + if (ss::lowres_clock::now() > deadline) { + break; + } + auto fetched = consumer.fetch_next(100ms).get(); if (fetched.value().empty()) { break; diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc index 1dfc3f24ad224..e884dcce42c87 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc @@ -22,23 +22,39 @@ using BasicConsumerFixture = kafka::client::tests::basic_consumer_fixture; namespace { ss::logger logger{"direct-consumer-test"}; + +// remove all fetches with only offset updates +chunked_hash_map> +filter_offset_only( + chunked_hash_map< + model::topic_partition, + chunked_vector>&& fetch) { + chunked_hash_map< + model::topic_partition, + chunked_vector> + ret; + for (auto& [k, v] : fetch) { + if (v.empty()) { + continue; + } + ret.emplace(k, std::move(v)); + } + return ret; } +} // namespace TEST_P(BasicConsumerFixture, TestBasicConsumption) { assign_partitions(make_assignment(topic, {0, 1, 2})); // no data should be available immediately, as the topic is empty - for (int i = 0; i < 10; ++i) { - auto fetched = consumer->fetch_next(100ms).get(); - ASSERT_TRUE(fetched.value().empty()); - } + ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); // produce some data produce_to_partition(topic, 0, 1000).get(); produce_to_partition(topic, 1, 400).get(); produce_to_partition(topic, 2, 20).get(); - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 3); ASSERT_EQ( @@ -61,7 +77,7 @@ TEST_P(BasicConsumerFixture, TestBasicConsumption) { produce_to_partition(topic, 2, 1000).get(); produce_to_partition(topic, 1, 400).get(); produce_to_partition(topic, 0, 20).get(); - auto fetched_2 = fetch_until_empty(*consumer); + auto fetched_2 = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched_2.size(), 3); ASSERT_EQ( fetched_2[model::topic_partition(topic, model::partition_id(0))] @@ -95,7 +111,7 @@ TEST_P(BasicConsumerFixture, TestBasicLeadershipTransfer) { .get(); { // fist fetch and assert - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ( fetched[model::topic_partition(topic, test_partition_id)] @@ -111,7 +127,7 @@ TEST_P(BasicConsumerFixture, TestBasicLeadershipTransfer) { .get(); { // second fetch and assert - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 1); ASSERT_EQ( @@ -137,7 +153,7 @@ TEST_P(BasicConsumerFixture, TestBasicNodeRestart) { .get(); { // fist fetch and assert - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ( fetched[model::topic_partition(topic, test_partition_id)] @@ -168,7 +184,7 @@ TEST_P(BasicConsumerFixture, TestBasicNodeRestart) { .get(); { // second fetch and assert - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 1); ASSERT_EQ( @@ -190,7 +206,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 2); ASSERT_EQ( fetched.find(model::topic_partition{topic, model::partition_id{2}}), @@ -214,7 +230,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 1); for (auto p : std::array{0, 2}) { @@ -238,7 +254,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 1); for (auto p : std::array{1, 2}) { @@ -266,7 +282,7 @@ TEST_P(BasicConsumerFixture, TestUnassignTopic) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 3); for (auto id : std::array{0, 1, 2}) { ASSERT_EQ( @@ -286,7 +302,7 @@ TEST_P(BasicConsumerFixture, TestUnassignTopic) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 0); } } @@ -304,7 +320,7 @@ TEST_P(BasicConsumerFixture, TestBogusPartitionIds) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 2); for (auto id : std::array{0, 2}) { ASSERT_EQ( @@ -326,7 +342,7 @@ TEST_P(BasicConsumerFixture, TestBogusPartitionIds) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), 2); for (auto id : std::array{0, 2}) { ASSERT_EQ( @@ -609,10 +625,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { assign_partitions(make_assignment( topic, initial_assignment | std::ranges::to>())); - for (int i = 0; i < 10; ++i) { - auto fetched = consumer->fetch_next(100ms).get(); - ASSERT_TRUE(fetched.value().empty()); - } + ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); constexpr int64_t n = 100; @@ -621,7 +634,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), initial_assignment.size()); } @@ -654,7 +667,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), second_produce.size()); } @@ -696,10 +709,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { assign_partitions(make_assignment( topic, initial_assignment | std::ranges::to>())); - for (int i = 0; i < 10; ++i) { - auto fetched = consumer->fetch_next(100ms).get(); - ASSERT_TRUE(fetched.value().empty()); - } + ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); constexpr int64_t n = 100; @@ -708,7 +718,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), initial_assignment.size()); } @@ -733,7 +743,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { } { - auto fetched = fetch_until_empty(*consumer); + auto fetched = filter_offset_only(fetch_until_empty(*consumer)); ASSERT_EQ(fetched.size(), p_first_unassign); } From 3fcbd56f68d2f80caec15791b02dab4dee3a1067 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Tue, 4 Nov 2025 11:12:15 -0800 Subject: [PATCH 3/4] cache progress, working with preemptive filtering --- .../client/direct_consumer/direct_consumer.cc | 47 +++++++++++++++---- .../client/direct_consumer/direct_consumer.h | 3 ++ src/v/kafka/client/direct_consumer/fetcher.cc | 3 +- .../tests/direct_consumer_fixture.cc | 8 +--- .../tests/direct_consumer_fixture_test.cc | 14 +++--- 5 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index a1f2b3cdbe17d..fa9ac2b7309b7 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -17,6 +17,7 @@ #include "ssx/future-util.h" #include +#include #include namespace kafka::client { @@ -38,26 +39,41 @@ direct_consumer::direct_consumer( ss::future direct_consumer::fetch_next(std::chrono::milliseconds timeout) { + static constexpr auto jitter = std::chrono::milliseconds(1); if (!_started) [[unlikely]] { throw std::runtime_error("Direct consumer is not started"); } auto holder = _gate.hold(); + + auto started = ss::lowres_clock::now(); + auto deadline = started + timeout; try { - auto maybe_response_to_filter = co_await _fetched_data_queue->pop( - timeout); - if (maybe_response_to_filter.has_error()) { + while (ss::lowres_clock::now() < deadline) { + auto remaining_timeout = deadline - ss::lowres_clock::now() + + jitter; + auto maybe_response_to_filter = co_await _fetched_data_queue->pop( + std::chrono::duration_cast( + remaining_timeout)); + if (maybe_response_to_filter.has_error()) { + co_return maybe_response_to_filter; + } + + // the remainder is synchronous, no lock is needed + auto& response_to_filter = maybe_response_to_filter.value(); + filter_stale_subscriptions(response_to_filter); + update_start_offsets(response_to_filter); + filter_empty(response_to_filter); + if (response_to_filter.empty()) { + continue; + } + co_return maybe_response_to_filter; } - // the remainder is synchronous, no lock is needed - auto& response_to_filter = maybe_response_to_filter.value(); - filter_stale_subscriptions(response_to_filter); - update_start_offsets(response_to_filter); - co_return maybe_response_to_filter; - } catch (ss::condition_variable_timed_out&) { co_return chunked_vector{}; } + co_return chunked_vector{}; } void direct_consumer::filter_stale_subscriptions( @@ -157,6 +173,19 @@ void direct_consumer::update_start_offsets( } } +void direct_consumer::filter_empty( + chunked_vector& responses_to_filter) const { + // for each topic, remove stale partitions + auto empty_subsegment = std::ranges::partition( + responses_to_filter, [](fetched_topic_data& topic_data) { + return topic_data.total_bytes != 0; + }); + + auto erase_iterator = responses_to_filter.end() - empty_subsegment.size(); + + responses_to_filter.erase_to_end(erase_iterator); +} + void direct_consumer::update_configuration(configuration cfg) { vlog( _cluster->logger().info, diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index 8ad87700c187e..0ded99647d280 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -185,6 +185,9 @@ class direct_consumer { void update_start_offsets( const chunked_vector& fetched_data); + void + filter_empty(chunked_vector& responses_to_filter) const; + cluster* _cluster; offset_reset_policy _reset_policy diff --git a/src/v/kafka/client/direct_consumer/fetcher.cc b/src/v/kafka/client/direct_consumer/fetcher.cc index a80bb93502836..82aabc347f206 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.cc +++ b/src/v/kafka/client/direct_consumer/fetcher.cc @@ -125,7 +125,8 @@ fetcher::fetcher( , _state_lock("fetcher/state") {} void fetcher::start() { - ssx::repeat_until_gate_closed(_gate, [this] { return do_fetch(); }); + ssx::repeat_until_gate_closed_or_aborted( + _gate, _as, [this] { return do_fetch(); }); } ss::future<> fetcher::stop() { diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc index 45a7d0fb6afd0..d0d9c67d04323 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc @@ -291,14 +291,8 @@ consumer_fixture::fetch_until_empty(direct_consumer& consumer) { chunked_vector> ret; - auto timeout = 1s; - auto deadline = ss::lowres_clock::now() + timeout; - while (true) { - if (ss::lowres_clock::now() > deadline) { - break; - } - auto fetched = consumer.fetch_next(100ms).get(); + auto fetched = consumer.fetch_next(1000ms).get(); if (fetched.value().empty()) { break; diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc index e884dcce42c87..903cde53191bb 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc @@ -33,13 +33,13 @@ filter_offset_only( model::topic_partition, chunked_vector> ret; - for (auto& [k, v] : fetch) { - if (v.empty()) { - continue; - } - ret.emplace(k, std::move(v)); - } - return ret; + // for (auto& [k, v] : fetch) { + // if (v.empty()) { + // continue; + // } + // ret.emplace(k, std::move(v)); + // } + return fetch; } } // namespace From 78801f064f647d3fab4d54eac32c3da106721dcf Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Tue, 4 Nov 2025 13:38:25 -0800 Subject: [PATCH 4/4] progress --- .../kafka/client/direct_consumer/api_types.h | 7 + .../client/direct_consumer/direct_consumer.cc | 144 +++++++++++------- .../client/direct_consumer/direct_consumer.h | 3 +- .../tests/direct_consumer_fixture.cc | 30 +++- .../tests/direct_consumer_fixture.h | 13 +- .../tests/direct_consumer_fixture_test.cc | 65 +++----- .../tests/direct_consumer_test.cc | 47 ++++-- 7 files changed, 189 insertions(+), 120 deletions(-) diff --git a/src/v/kafka/client/direct_consumer/api_types.h b/src/v/kafka/client/direct_consumer/api_types.h index ba37963306c34..ea009e7dea014 100644 --- a/src/v/kafka/client/direct_consumer/api_types.h +++ b/src/v/kafka/client/direct_consumer/api_types.h @@ -90,6 +90,13 @@ struct source_partition_offsets { kafka::offset last_stable_offset{-1}; // The timestamp that the fetch response was received by the client ss::lowres_clock::time_point last_offset_update_timestamp{}; + + // check all except timestamp + bool operator==(const source_partition_offsets& other) const { + return log_start_offset == other.log_start_offset + && high_watermark == other.high_watermark + && last_stable_offset == other.last_stable_offset; + } }; struct partition_offset { diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index fa9ac2b7309b7..afcd3739d050a 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -39,21 +39,22 @@ direct_consumer::direct_consumer( ss::future direct_consumer::fetch_next(std::chrono::milliseconds timeout) { - static constexpr auto jitter = std::chrono::milliseconds(1); + static constexpr auto jitter = std::chrono::milliseconds{1}; if (!_started) [[unlikely]] { throw std::runtime_error("Direct consumer is not started"); } auto holder = _gate.hold(); - auto started = ss::lowres_clock::now(); - auto deadline = started + timeout; + auto deadline = ss::lowres_clock::now() + timeout; + try { while (ss::lowres_clock::now() < deadline) { - auto remaining_timeout = deadline - ss::lowres_clock::now() - + jitter; + auto timeout_remaining + = std::chrono::duration_cast( + deadline - ss::lowres_clock::now() + jitter); + auto maybe_response_to_filter = co_await _fetched_data_queue->pop( - std::chrono::duration_cast( - remaining_timeout)); + timeout_remaining); if (maybe_response_to_filter.has_error()) { co_return maybe_response_to_filter; } @@ -62,7 +63,9 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) { auto& response_to_filter = maybe_response_to_filter.value(); filter_stale_subscriptions(response_to_filter); update_start_offsets(response_to_filter); - filter_empty(response_to_filter); + + // if the filters have removed everything, there are no updates. + // rejoin the queue if (response_to_filter.empty()) { continue; } @@ -122,55 +125,82 @@ void direct_consumer::filter_stale_subscriptions( } void direct_consumer::update_start_offsets( - const chunked_vector& fetched_data) { - for (const auto& topic_data : fetched_data) { - for (const auto& partition_data : topic_data.partitions) { - // we have already filtered the map, we know these will be found - auto maybe_subscription = find_subscription( - topic_data.topic, partition_data.partition_id); - vassert( - maybe_subscription.has_value(), - "invoke only after filtering, all subscriptions should be " - "consistent"); - auto& subscription = maybe_subscription->get(); - source_partition_offsets spo{ - .log_start_offset = partition_data.start_offset, - .high_watermark = partition_data.high_watermark, - .last_stable_offset = partition_data.last_stable_offset, - .last_offset_update_timestamp = ss::lowres_clock::now()}; - if ( - subscription.last_known_source_offsets.log_start_offset - > spo.log_start_offset) { - vlog( - _cluster->logger().error, - "log start offset should never move backward, current: {}, " - "found: {}", - subscription.last_known_source_offsets.log_start_offset, - spo.log_start_offset); - } - if ( - subscription.last_known_source_offsets.high_watermark - > spo.high_watermark) { - vlog( - _cluster->logger().warn, - "high watermark should not normally move backward, current: " - "{}, found: {}", - subscription.last_known_source_offsets.high_watermark, - spo.high_watermark); - } - if ( - subscription.last_known_source_offsets.last_stable_offset - > spo.last_stable_offset) { - vlog( - _cluster->logger().error, - "last known stable should never move backward, " - "current: {}, found: {}", - subscription.last_known_source_offsets.last_stable_offset, - spo.last_stable_offset); - } - subscription.last_known_source_offsets = spo; - } - } + chunked_vector& fetched_data) { + auto empty_topic_subsegment = std::ranges::partition( + fetched_data, [this](auto& topic_data) { + // for (auto& topic_data : fetched_data) { + // remove anything with no updates: it must have no data and have no + // offset updates + auto non_updated_subsegment = std::ranges::partition( + topic_data.partitions, [this, &topic_data](auto& partition_data) { + auto maybe_subscription = find_subscription( + topic_data.topic, partition_data.partition_id); + vassert( + maybe_subscription.has_value(), + "invoke only after filtering, all subscriptions should be " + "consistent"); + auto& subscription = maybe_subscription->get(); + source_partition_offsets spo{ + .log_start_offset = partition_data.start_offset, + .high_watermark = partition_data.high_watermark, + .last_stable_offset = partition_data.last_stable_offset, + .last_offset_update_timestamp = ss::lowres_clock::now()}; + + // not an update, screen it out + if ( + spo == subscription.last_known_source_offsets + && partition_data.data.size() == 0) { + subscription.last_known_source_offsets + .last_offset_update_timestamp + = ss::lowres_clock::now(); + return false; + } + + if ( + subscription.last_known_source_offsets.log_start_offset + > spo.log_start_offset) { + vlog( + _cluster->logger().error, + "log start offset should never move backward, current: " + "{}, " + "found: {}", + subscription.last_known_source_offsets.log_start_offset, + spo.log_start_offset); + } + if ( + subscription.last_known_source_offsets.high_watermark + > spo.high_watermark) { + vlog( + _cluster->logger().warn, + "high watermark should not normally move backward, " + "current: " + "{}, found: {}", + subscription.last_known_source_offsets.high_watermark, + spo.high_watermark); + } + if ( + subscription.last_known_source_offsets.last_stable_offset + > spo.last_stable_offset) { + vlog( + _cluster->logger().error, + "last known stable should never move backward, " + "current: {}, found: {}", + subscription.last_known_source_offsets.last_stable_offset, + spo.last_stable_offset); + } + subscription.last_known_source_offsets = spo; + return true; + }); + + // no need to update sizes, if there were data, it would not be + // filtered out + topic_data.partitions.erase_to_end(non_updated_subsegment.begin()); + + // gather the empty topics to the end s.t. they can be filtered out + return !topic_data.partitions.empty(); + }); + + fetched_data.erase_to_end(empty_topic_subsegment.begin()); } void direct_consumer::filter_empty( diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index 0ded99647d280..39a12e34976a7 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -182,8 +182,7 @@ class direct_consumer { void filter_stale_subscriptions( chunked_vector& responses_to_filter) const; - void update_start_offsets( - const chunked_vector& fetched_data); + void update_start_offsets(chunked_vector& fetched_data); void filter_empty(chunked_vector& responses_to_filter) const; diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc index d0d9c67d04323..f1c197294aff6 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.cc @@ -285,14 +285,16 @@ consumer_fixture::consume_from_partition( } chunked_hash_map> -consumer_fixture::fetch_until_empty(direct_consumer& consumer) { +consumer_fixture::gather_fetches( + direct_consumer& consumer, std::chrono::milliseconds gather_time) { chunked_hash_map< model::topic_partition, chunked_vector> ret; - while (true) { - auto fetched = consumer.fetch_next(1000ms).get(); + auto deadline = ss::lowres_clock::now() + gather_time; + while (ss::lowres_clock::now() < deadline) { + auto fetched = consumer.fetch_next(100ms).get(); if (fetched.value().empty()) { break; @@ -310,6 +312,28 @@ consumer_fixture::fetch_until_empty(direct_consumer& consumer) { return ret; } +chunked_hash_map> +consumer_fixture::filter_offset_only( + chunked_hash_map> + fetch) { + chunked_hash_map< + model::topic_partition, + chunked_vector> + ret{}; + + std::vector empty_keys{ + std::from_range, + fetch | std::ranges::views::filter([](const auto& map_pair) { + return map_pair.second.size() == 0; + }) | std::ranges::views::transform([](const auto& map_pair) { + return map_pair.first; + })}; + for (const auto& empty_key : empty_keys) { + fetch.erase(empty_key); + } + return fetch; +} + void consumer_fixture::assign_partitions(topic_assignment assgn) { consumer ->assign_partitions( diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.h b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.h index d02a30bc35356..4f5f87b99696e 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.h +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture.h @@ -59,10 +59,12 @@ class consumer_fixture : public cluster_test_fixture { ss::future> consume_from_partition( const model::topic& topic, int partition, kafka::offset offset); + // will gather consumer data for 1s chunked_hash_map< model::topic_partition, chunked_vector> - fetch_until_empty(direct_consumer& consumer); + gather_fetches( + direct_consumer& consumer, std::chrono::milliseconds gather_time = 1s); void assign_partitions(topic_assignment assgn); void unassign_partition(model::topic_partition tp); void unassign_topic(model::topic topic); @@ -73,6 +75,15 @@ class consumer_fixture : public cluster_test_fixture { application* create_node_application(model::node_id node_id); + // remove all fetches with only offset updates + chunked_hash_map< + model::topic_partition, + chunked_vector> + filter_offset_only( + chunked_hash_map< + model::topic_partition, + chunked_vector> fetch); + protected: redpanda_thread_fixture* rp; std::unique_ptr cluster; diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc index 903cde53191bb..2dd590417cf7c 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_fixture_test.cc @@ -16,45 +16,28 @@ #include +#include + using namespace kafka::client; using ConsumerFixture = kafka::client::tests::consumer_fixture; using BasicConsumerFixture = kafka::client::tests::basic_consumer_fixture; namespace { ss::logger logger{"direct-consumer-test"}; - -// remove all fetches with only offset updates -chunked_hash_map> -filter_offset_only( - chunked_hash_map< - model::topic_partition, - chunked_vector>&& fetch) { - chunked_hash_map< - model::topic_partition, - chunked_vector> - ret; - // for (auto& [k, v] : fetch) { - // if (v.empty()) { - // continue; - // } - // ret.emplace(k, std::move(v)); - // } - return fetch; -} } // namespace TEST_P(BasicConsumerFixture, TestBasicConsumption) { assign_partitions(make_assignment(topic, {0, 1, 2})); // no data should be available immediately, as the topic is empty - ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); + ASSERT_TRUE(filter_offset_only(gather_fetches(*consumer)).empty()); // produce some data produce_to_partition(topic, 0, 1000).get(); produce_to_partition(topic, 1, 400).get(); produce_to_partition(topic, 2, 20).get(); - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 3); ASSERT_EQ( @@ -77,7 +60,7 @@ TEST_P(BasicConsumerFixture, TestBasicConsumption) { produce_to_partition(topic, 2, 1000).get(); produce_to_partition(topic, 1, 400).get(); produce_to_partition(topic, 0, 20).get(); - auto fetched_2 = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched_2 = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched_2.size(), 3); ASSERT_EQ( fetched_2[model::topic_partition(topic, model::partition_id(0))] @@ -111,7 +94,7 @@ TEST_P(BasicConsumerFixture, TestBasicLeadershipTransfer) { .get(); { // fist fetch and assert - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ( fetched[model::topic_partition(topic, test_partition_id)] @@ -127,7 +110,7 @@ TEST_P(BasicConsumerFixture, TestBasicLeadershipTransfer) { .get(); { // second fetch and assert - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 1); ASSERT_EQ( @@ -137,7 +120,7 @@ TEST_P(BasicConsumerFixture, TestBasicLeadershipTransfer) { model::offset(first_produce_count + second_produce_count - 1)); } } - +/* TEST_P(BasicConsumerFixture, TestBasicNodeRestart) { // constants constexpr uint first_produce_count = 10; @@ -153,7 +136,7 @@ TEST_P(BasicConsumerFixture, TestBasicNodeRestart) { .get(); { // fist fetch and assert - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ( fetched[model::topic_partition(topic, test_partition_id)] @@ -184,7 +167,7 @@ TEST_P(BasicConsumerFixture, TestBasicNodeRestart) { .get(); { // second fetch and assert - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 1); ASSERT_EQ( @@ -206,7 +189,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 2); ASSERT_EQ( fetched.find(model::topic_partition{topic, model::partition_id{2}}), @@ -230,7 +213,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 1); for (auto p : std::array{0, 2}) { @@ -254,7 +237,7 @@ TEST_P(BasicConsumerFixture, TestUnassignPartition) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 1); for (auto p : std::array{1, 2}) { @@ -282,7 +265,7 @@ TEST_P(BasicConsumerFixture, TestUnassignTopic) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 3); for (auto id : std::array{0, 1, 2}) { ASSERT_EQ( @@ -302,7 +285,7 @@ TEST_P(BasicConsumerFixture, TestUnassignTopic) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 0); } } @@ -320,7 +303,7 @@ TEST_P(BasicConsumerFixture, TestBogusPartitionIds) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 2); for (auto id : std::array{0, 2}) { ASSERT_EQ( @@ -342,7 +325,7 @@ TEST_P(BasicConsumerFixture, TestBogusPartitionIds) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), 2); for (auto id : std::array{0, 2}) { ASSERT_EQ( @@ -353,7 +336,7 @@ TEST_P(BasicConsumerFixture, TestBogusPartitionIds) { } } } - +*/ using session_config = kafka::client::tests::session_config; INSTANTIATE_TEST_SUITE_P( test_with_basic_consume_fixture, @@ -625,7 +608,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { assign_partitions(make_assignment( topic, initial_assignment | std::ranges::to>())); - ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); + ASSERT_TRUE(filter_offset_only(gather_fetches(*consumer)).empty()); constexpr int64_t n = 100; @@ -634,7 +617,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), initial_assignment.size()); } @@ -667,7 +650,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestContents) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), second_produce.size()); } @@ -709,7 +692,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { assign_partitions(make_assignment( topic, initial_assignment | std::ranges::to>())); - ASSERT_TRUE(filter_offset_only(fetch_until_empty(*consumer)).empty()); + ASSERT_TRUE(filter_offset_only(gather_fetches(*consumer)).empty()); constexpr int64_t n = 100; @@ -718,7 +701,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), initial_assignment.size()); } @@ -743,7 +726,7 @@ TEST_F(FetchSessionFixture, TestFetchRequestUnassignContents) { } { - auto fetched = filter_offset_only(fetch_until_empty(*consumer)); + auto fetched = filter_offset_only(gather_fetches(*consumer)); ASSERT_EQ(fetched.size(), p_first_unassign); } diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc index 2d7c0310d37eb..7ebe8e30efd1a 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc @@ -246,21 +246,36 @@ struct consumer_test_mock : public ::testing::Test { ss::future<> fetch_and_append_to_map( topic_partition_map>& batches, direct_consumer& consumer) { - auto data = co_await consumer.fetch_next(std::chrono::seconds(4)); - for (auto& topic_data : data.value()) { - for (auto& partition_data : topic_data.partitions) { - auto& b_vector - = batches[topic_data.topic][partition_data.partition_id]; - vlog( - test_log.info, - "Fetched {} batches for topic: {}, partition: {}, " - "total_batches: {}", - partition_data.data.size(), - topic_data.topic, - partition_data.partition_id, - b_vector.size()); - std::ranges::move( - partition_data.data, std::back_inserter(b_vector)); + vlog(test_log.info, "begin fetch and append"); + auto deadline = ss::lowres_clock::now() + 4s; + while (ss::lowres_clock::now() < deadline) { + vlog(test_log.info, "running at all"); + auto data = co_await consumer.fetch_next(100ms); + for (auto& topic_data : data.value()) { + if (topic_data.total_bytes == 0) { + vlog( + test_log.info, + "Fetched empty batch set for topic: {}", + topic_data.topic); + continue; + } + for (auto& partition_data : topic_data.partitions) { + auto& b_vector + = batches[topic_data.topic][partition_data.partition_id]; + vlog( + test_log.info, + "Fetched {} batches for topic: {}, partition: {}, " + "total_batches: {}", + partition_data.data.size(), + topic_data.topic, + partition_data.partition_id, + b_vector.size()); + if (partition_data.size_bytes == 0) { + continue; + } + std::ranges::move( + partition_data.data, std::back_inserter(b_vector)); + } } } } @@ -341,7 +356,7 @@ TEST_F(consumer_test_mock, TestAssignUnassignPartitions) { consumer.assign_partitions(make_assignment(test_topic, {0})).get(); fetch_and_append_to_map(all_batches, consumer).get(); - auto data = consumer.fetch_next(std::chrono::seconds(4)).get(); + // auto data = consumer.fetch_next(std::chrono::seconds(4)).get(); ASSERT_EQ(all_batches.size(), 1); ASSERT_TRUE(all_batches.contains(test_topic)); ASSERT_EQ(all_batches[test_topic].size(), 1);