diff --git a/src/v/kafka/client/direct_consumer/api_types.h b/src/v/kafka/client/direct_consumer/api_types.h index ff332f6e3835d..2cb15693b1fe3 100644 --- a/src/v/kafka/client/direct_consumer/api_types.h +++ b/src/v/kafka/client/direct_consumer/api_types.h @@ -20,6 +20,9 @@ #include namespace kafka::client { + +using subscription_epoch = named_type; + enum class offset_reset_policy : int8_t { // reset to the earliest offset earliest, @@ -63,6 +66,8 @@ struct fetched_partition_data { chunked_vector data; kafka::error_code error = kafka::error_code::none; std::optional> aborted_transactions; + subscription_epoch subscription_epoch; + size_t size_bytes; }; struct fetched_topic_data { diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.cc b/src/v/kafka/client/direct_consumer/direct_consumer.cc index ffdb15880b12e..63257aae8ae93 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.cc +++ b/src/v/kafka/client/direct_consumer/direct_consumer.cc @@ -10,11 +10,17 @@ */ #include "kafka/client/direct_consumer/direct_consumer.h" +#include "kafka/client/direct_consumer/api_types.h" #include "kafka/client/direct_consumer/data_queue.h" #include "kafka/client/direct_consumer/fetcher.h" #include "model/validation.h" #include "ssx/future-util.h" +#include + +#include +#include + namespace kafka::client { direct_consumer::direct_consumer(cluster& cluster, configuration cfg) @@ -31,12 +37,75 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) { } auto holder = _gate.hold(); try { - co_return co_await _fetched_data_queue->pop(timeout); + auto maybe_response_to_filter = co_await _fetched_data_queue->pop( + timeout); + if (maybe_response_to_filter.has_error()) { + co_return maybe_response_to_filter; + } + + // the remainder is synchronous, no lock is needed + + auto& response_to_filter = maybe_response_to_filter.value(); + filter_stale_subscriptions(response_to_filter); + co_return maybe_response_to_filter; + } catch (ss::condition_variable_timed_out&) { co_return chunked_vector{}; } } +void direct_consumer::filter_stale_subscriptions( + chunked_vector& responses_to_filter) { + // for each topic, remove stale partitions + for (auto& topic_data : responses_to_filter) { + auto& partition_data = topic_data.partitions; + + auto non_stale_subsegment = std::ranges::partition( + partition_data, + [this, &topic_data](const fetched_partition_data& data) mutable { + // decrement a stale partition's contribution to the topic size + const auto partition_size = data.size_bytes; + auto decrement_holder = ss::defer([&topic_data, partition_size] { + topic_data.total_bytes -= partition_size; + }); + + auto maybe_current_subscription_epoch = find_subscription_epoch( + topic_data.topic, data.partition_id); + + vlog( + _cluster->logger().debug, + "current subscription epoch: {}, found request epoch: {}", + maybe_current_subscription_epoch, + data.subscription_epoch); + + // no longer assigned + if (!maybe_current_subscription_epoch) { + return false; + } + bool is_not_stale = data.subscription_epoch + == *maybe_current_subscription_epoch; + if (is_not_stale) { + decrement_holder.cancel(); + } + return is_not_stale; + }); + + auto erase_iterator = partition_data.end() + - non_stale_subsegment.size(); + + partition_data.erase_to_end(erase_iterator); + } + + // remove newly empty topics + auto non_empty_subsegment = std::ranges::partition( + responses_to_filter, [](const fetched_topic_data& topic_data) { + return !topic_data.partitions.empty(); + }); + + responses_to_filter.erase_to_end( + responses_to_filter.end() - non_empty_subsegment.size()); +} + void direct_consumer::update_configuration(configuration cfg) { vlog( _cluster->logger().info, @@ -103,13 +172,17 @@ ss::future<> direct_consumer::update_fetchers( sub.current_fetcher = *leader_id; auto& new_fetcher = get_fetcher(*leader_id); co_await new_fetcher.assign_partition( - model::topic_partition_view(topic, p_id), sub.fetch_offset); + model::topic_partition_view(topic, p_id), + sub.fetch_offset, + sub.subscription_epoch); } else if (sub.fetch_offset) { // If the fetch offset is set, update it auto& current = get_fetcher(*sub.current_fetcher); co_await current.assign_partition( - model::topic_partition_view(topic, p_id), sub.fetch_offset); + model::topic_partition_view(topic, p_id), + sub.fetch_offset, + sub.subscription_epoch); } sub.fetch_offset.reset(); } @@ -148,6 +221,21 @@ fetcher& direct_consumer::get_fetcher(model::node_id id) { return *it->second; } +std::optional direct_consumer::find_subscription_epoch( + const model::topic& topic, model::partition_id partition_id) { + auto t_it = _subscriptions.find(topic); + if (t_it == _subscriptions.end()) { + return std::nullopt; + } + + auto& p_map = t_it->second; + auto p_it = p_map.find(partition_id); + if (p_it == p_map.end()) { + return std::nullopt; + } + return p_it->second.subscription_epoch; +} + ss::future<> direct_consumer::start() { if (_started) { co_return; @@ -182,14 +270,17 @@ direct_consumer::assign_partitions(chunked_vector topics) { "Invalid topic name: {}, error: {}", t.topic, ec.message())); } for (auto& p : t.partitions) { + auto epoch_to_assign = ++epoch; vlog( _cluster->logger().trace, - "Assigning partition: {}/{} with offset: {}", + "Assigning partition: {}/{} with offset: {} and epoch {}", t.topic, p.partition_id, - p.next_offset); - auto& sub = _subscriptions[t.topic][p.partition_id]; - sub.fetch_offset = p.next_offset; + p.next_offset, + epoch_to_assign); + _subscriptions[t.topic].insert_or_assign( + p.partition_id, + subscription{std::nullopt, p.next_offset, epoch_to_assign}); } } co_await update_fetchers(std::move(lock_holder)); @@ -207,7 +298,7 @@ direct_consumer::unassign_topics(chunked_vector topics) { } removals[topic].reserve(state_it->second.size()); for (const auto& [p_id, sub] : state_it->second) { - removals[topic][p_id] = sub; + removals[topic].insert_or_assign(p_id, sub); } _subscriptions.erase(topic); } @@ -227,7 +318,7 @@ ss::future<> direct_consumer::unassign_partitions( auto p_it = state_it->second.find(tp.partition); if (p_it != state_it->second.end()) { - removals[tp.topic][tp.partition] = p_it->second; + removals[tp.topic].insert_or_assign(tp.partition, p_it->second); state_it->second.erase(p_it); } if (state_it->second.empty()) { diff --git a/src/v/kafka/client/direct_consumer/direct_consumer.h b/src/v/kafka/client/direct_consumer/direct_consumer.h index 22fb9b27fbaa5..f1374e354fd7f 100644 --- a/src/v/kafka/client/direct_consumer/direct_consumer.h +++ b/src/v/kafka/client/direct_consumer/direct_consumer.h @@ -13,6 +13,7 @@ #include "container/chunked_hash_map.h" #include "kafka/client/cluster.h" #include "kafka/client/direct_consumer/api_types.h" +#include "model/fundamental.h" namespace kafka { struct metadata_response_data; @@ -125,8 +126,24 @@ class direct_consumer { private: struct subscription { + subscription() = delete; + subscription( + std::optional current_fetcher, + std::optional fetch_offset, + subscription_epoch subscription_epoch) noexcept + : current_fetcher{current_fetcher} + , fetch_offset{fetch_offset} + , subscription_epoch{subscription_epoch} {} + + subscription(const subscription&) = default; + subscription(subscription&&) = default; + subscription& operator=(const subscription&) = default; + subscription& operator=(subscription&&) = default; + ~subscription() = default; + std::optional current_fetcher; std::optional fetch_offset; + subscription_epoch subscription_epoch; }; friend class fetcher; void on_metadata_update(const metadata_response_data&); @@ -138,6 +155,12 @@ class direct_consumer { fetcher& get_fetcher(model::node_id id); + std::optional find_subscription_epoch( + const model::topic& topic, model::partition_id partition_id); + + void filter_stale_subscriptions( + chunked_vector& responses_to_filter); + cluster* _cluster; offset_reset_policy _reset_policy @@ -152,6 +175,9 @@ class direct_consumer { std::unique_ptr _fetched_data_queue; ss::condition_variable _data_available; + // tracks the version of a subscribed ntp + subscription_epoch epoch{0}; + cluster::callback_id _metadata_callback_id; bool _started = false; ss::gate _gate; diff --git a/src/v/kafka/client/direct_consumer/fetcher.cc b/src/v/kafka/client/direct_consumer/fetcher.cc index e78cc5ac2a9f0..67a96d167437d 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.cc +++ b/src/v/kafka/client/direct_consumer/fetcher.cc @@ -12,6 +12,7 @@ #include "absl/container/flat_hash_set.h" #include "base/format_to.h" +#include "kafka/client/direct_consumer/api_types.h" #include "kafka/client/direct_consumer/data_queue.h" #include "kafka/client/direct_consumer/direct_consumer.h" #include "kafka/client/errors.h" @@ -24,6 +25,8 @@ #include +#include + namespace kafka::client { static constexpr model::node_id client_replica_id{-1}; static constexpr std::chrono::milliseconds error_backoff(200); @@ -149,8 +152,12 @@ ss::future fetcher::collect_partitions() { partitions, [&to_process, &ret, inc = _session_state.incremental()](auto& p_fs) { partition_fetch_state& fetch_state = p_fs.second; - ret.assignment_epochs[to_process.topic][fetch_state.partition_id] - = fetch_state.assignment_epoch; + + ret.snapshotted_epochs[to_process.topic].insert_or_assign( + fetch_state.partition_id, + epoch_set{ + fetch_state.fetcher_epoch, fetch_state.subscription_epoch}); + if (!fetch_state.fetch_offset.has_value()) { to_process.to_list_offsets.push_back(fetch_state); return; @@ -274,49 +281,12 @@ bool fetcher::maybe_update_fetch_offset( const model::topic& topic, model::partition_id partition_id, kafka::offset last_received, - kafka::offset high_watermark, - std::optional maybe_response_epoch) { - auto t_it = _partitions.find(topic); - if (t_it == _partitions.end()) { - return false; - } - auto p_it = t_it->second.find(partition_id); - if (p_it == t_it->second.end()) { - return false; - } - - // possible in a tight race - // 1. unassign from A - // 2. begin fetch - // 3. assign to A - // 4. broker receives update - // 5. the fetch will receive a response for which it did not have an epoch - // this is logically the same as epoch mismatch - if (!maybe_response_epoch) { - vlog( - logger().info, - "[broker: {}] Ignoring unbidden {}/{} current epoch: {}", - _id, - topic, - partition_id, - p_it->second.assignment_epoch); - return false; - } - - const auto response_epoch = maybe_response_epoch.value(); - - if (p_it->second.assignment_epoch != response_epoch) { - vlog( - logger().trace, - "[broker: {}] Ignoring {}/{} reply, assignment epoch changed, " - "request epoch: {}, current epoch: {}", - _id, - topic, - partition_id, - response_epoch, - p_it->second.assignment_epoch); + kafka::offset high_watermark) { + auto maybe_fetcher_state = find_fetcher_state(topic, partition_id); + if (!maybe_fetcher_state) { return false; } + auto& fetcher_state = maybe_fetcher_state->get(); vlog( logger().trace, @@ -324,14 +294,14 @@ bool fetcher::maybe_update_fetch_offset( _id, topic, partition_id, - p_it->second.fetch_offset, + fetcher_state.fetch_offset, kafka::next_offset(last_received), high_watermark); - p_it->second.high_watermark = high_watermark; - p_it->second.fetch_offset = kafka::next_offset(last_received); + fetcher_state.high_watermark = high_watermark; + fetcher_state.fetch_offset = kafka::next_offset(last_received); // we updated the fetch offset, so we should sync to with the broker's // fetch session on the next request - p_it->second.incremental_include = true; + fetcher_state.incremental_include = true; return true; } @@ -347,11 +317,11 @@ ss::future<> fetcher::do_fetch() { * request. */ auto partitions_with_epochs = co_await collect_partitions(); - auto assignment_epochs = std::move( - partitions_with_epochs.assignment_epochs); + auto snapshotted_epochs = std::move( + partitions_with_epochs.snapshotted_epochs); auto list_offsets_err = co_await maybe_initialise_fetch_offsets( - partitions_with_epochs.partitions, assignment_epochs); + partitions_with_epochs.partitions, snapshotted_epochs); /** */ if (list_offsets_err != kafka::error_code::none) { @@ -382,7 +352,7 @@ ss::future<> fetcher::do_fetch() { _id, std::move(req), version); auto fetch_result = co_await process_fetch_response( std::move(response), - assignment_epochs, + snapshotted_epochs, partitions_with_epochs.partitions); if (fetch_result.has_error()) { @@ -438,10 +408,10 @@ ss::future<> fetcher::do_fetch() { } } -std::optional fetcher::find_assignment_epoch( +std::optional fetcher::find_epoch_set( const model::topic& topic, model::partition_id partition, - const topic_partition_map& epochs) { + const topic_partition_map& epochs) { auto topic_iterator = epochs.find(topic); if (topic_iterator == epochs.end()) { @@ -461,7 +431,7 @@ std::optional fetcher::find_assignment_epoch( ss::future> fetcher::process_fetch_response( fetch_response resp, - const topic_partition_map& epochs, + const topic_partition_map& epochs, const chunked_vector& partitions) { if (resp.data.error_code != kafka::error_code::none) { co_return resp.data.error_code; @@ -472,6 +442,24 @@ fetcher::process_fetch_response( auto lock = co_await _state_lock.get_units(); + // we allow for assignment updates to occur while a fetch is ongoing s.t. + // assignment updates are not blocked by a longstanding fetch. At this + // point, all inconsistent fetch responses should be discarded + for (auto& topic_response : resp.data.responses) { + auto consistent_subrange = std::ranges::partition( + topic_response.partitions, + [this, &topic_response, &epochs](partition_data& partition_response) { + return is_consistent_fetcher_epoch( + topic_response.topic, + partition_response.partition_index, + epochs); + }); + topic_response.partitions.erase_to_end( + topic_response.partitions.end() - consistent_subrange.size()); + } + + // all responses now belong to consistent tps + // For fetch session maintenance, the goal is to omit partitions from each // fetch request whenever possible. The incremental_include flag controls // whether a certain partition appears in the next fetch request, after @@ -504,6 +492,12 @@ fetcher::process_fetch_response( part_data.error = part_response.error_code; part_data.partition_id = part_response.partition_index; + // guaranteed to be found after consistency filter + part_data.subscription_epoch + = find_epoch_set(topic_data.topic, part_data.partition_id, epochs) + .value() + .subscription_epoch; + if (part_response.error_code != kafka::error_code::none) { if ( part_response.error_code @@ -571,24 +565,22 @@ fetcher::process_fetch_response( || part_response.records->is_end_of_stream()) { continue; } - topic_data.total_bytes += part_response.records->size_bytes(); + + // size calculation for the response payload + const auto partition_payload_size + = part_response.records->size_bytes(); + part_data.size_bytes = partition_payload_size; + topic_data.total_bytes += partition_payload_size; + part_data.data = co_await reader_to_chunked_vector( std::move(part_response.records.value())); - auto maybe_response_epoch = find_assignment_epoch( - topic_data.topic, part_data.partition_id, epochs); - bool updated_offset = maybe_update_fetch_offset( topic_data.topic, part_data.partition_id, model::offset_cast(part_data.data.back().last_offset()), - part_data.high_watermark, - maybe_response_epoch); + part_data.high_watermark); if (!updated_offset) { - // case when partition is either - // 1. partition is not assigned to this fetcher - // 2. assignment epoch changed - // for either skip continue; } dirty_partitions[topic_data.topic].insert( @@ -620,58 +612,24 @@ fetcher::process_fetch_response( "partitions, not both"); if (!included.empty()) { - // _partitions maps topic -> parition map - // partition map maps partition -> subscription info - // get an iterator to the topic map - auto topic_iterator = _partitions.find(topic); - if (topic_iterator == _partitions.end()) { - continue; - } - auto errs_it = dirty_partitions.find(topic); bool topic_err = errs_it != dirty_partitions.end(); - auto& partition_map = topic_iterator->second; - for (const auto& p : included) { bool partition_err = topic_err && errs_it->second.contains( p.partition_id); - auto partition_iterator = partition_map.find(p.partition_id); - if ( - partition_iterator != partition_map.end() && !partition_err) { - // compare the epochs before we make an edit - const auto assigned_epoch - = partition_iterator->second.assignment_epoch; - - auto maybe_request_epoch = find_assignment_epoch( - topic, p.partition_id, epochs); - - if (!maybe_request_epoch) { - // see maybe_update_fetch_offset for explanation - vlog( - logger().info, - "unbidden response on ntp: {}/{}", - topic, - p.partition_id); - continue; - } - - const auto request_epoch = maybe_request_epoch.value(); - - if (assigned_epoch == request_epoch) { - partition_iterator->second.incremental_include = false; - } else { - vlog( - logger().trace, - "disclusion epoch mismatch on ntp: {}, " - "fetched_epoch, {}, current_epoch: {}", - topic, - p.partition_id, - request_epoch, - assigned_epoch); - } + + // if errored, keep it in the next fetch + if (partition_err) { + continue; } + + // remove from next fetch, guaranteed to exist after consistency + // filter + auto& fetcher_state + = find_fetcher_state(topic, p.partition_id)->get(); + fetcher_state.incremental_include = false; } } @@ -711,7 +669,7 @@ void fetcher::reset_partition_offset(model::topic_partition_view tp) { return; } p_it->second.fetch_offset = std::nullopt; - p_it->second.assignment_epoch = next_epoch(); + p_it->second.fetcher_epoch = next_epoch(); } namespace { @@ -727,7 +685,7 @@ model::timestamp timestamp_for_offset_reset_policy(offset_reset_policy policy) { ss::future fetcher::maybe_initialise_fetch_offsets( const chunked_vector& partitions, - const topic_partition_map& epochs) { + const topic_partition_map& epochs) { const auto timestamp = timestamp_for_offset_reset_policy( _parent->_config.reset_policy); @@ -798,47 +756,18 @@ ss::future fetcher::maybe_initialise_fetch_offsets( } // global state may have changed, we're not locked - auto t_it = _partitions.find(response_topic.topic); - if (t_it == _partitions.end()) { - continue; - } - auto p_it = t_it->second.find(response_partition.partition_id); - if (p_it == t_it->second.end()) { - continue; - } - - const auto assigned_epoch = p_it->second.assignment_epoch; - - auto maybe_response_epoch = find_assignment_epoch( - response_topic.topic, response_partition.partition_id, epochs); - - if (!maybe_response_epoch) { - vlog( - logger().warn, - "[broker: {} received a list topics response which was not " - "requested on ntp: {}/{}", - _id, + if (!is_consistent_fetcher_epoch( response_topic.topic, - response_partition.partition_id); + response_partition.partition_id, + epochs)) { continue; } - const auto request_epoch = maybe_response_epoch.value(); + auto& fetcher_state = find_fetcher_state( + response_topic.topic, + response_partition.partition_id) + ->get(); - if (request_epoch != assigned_epoch) { - vlog( - logger().trace, - "[broker: {}] Skipping partition {}/{} list offset response " - "as assignment epoch has changed. request_epoch: {}, " - "current_epoch: {}", - _id, - response_topic.topic, - response_partition.partition_id, - response_partition.offset, - request_epoch, - p_it->second.assignment_epoch); - continue; - } vlog( logger().info, "[broker: {}] Resetting partition {}/{} fetch offset to: {}", @@ -846,9 +775,9 @@ ss::future fetcher::maybe_initialise_fetch_offsets( response_topic.topic, response_partition.partition_id, response_partition.offset); - p_it->second.fetch_offset = response_partition.offset; - p_it->second.high_watermark.reset(); - p_it->second.incremental_include = true; + fetcher_state.fetch_offset = response_partition.offset; + fetcher_state.high_watermark.reset(); + fetcher_state.incremental_include = true; } } @@ -878,7 +807,9 @@ ss::future fetcher::get_list_offsets_request_version() const { } ss::future<> fetcher::assign_partition( - model::topic_partition_view tp, std::optional offset) { + model::topic_partition_view tp, + std::optional offset, + subscription_epoch subscription_epoch) { auto lock = co_await _state_lock.get_units(); vlog( logger().debug, @@ -887,19 +818,33 @@ ss::future<> fetcher::assign_partition( tp, offset); - _partitions[tp.topic][tp.partition] = partition_fetch_state{ - .partition_id = tp.partition, - .fetch_offset = offset, - .assignment_epoch = next_epoch(), - .incremental_include = true}; + auto maybe_existing_assignment = find_fetcher_state(tp.topic, tp.partition); + if (maybe_existing_assignment) { + auto& existing_assignment = maybe_existing_assignment->get(); + vlog( + logger().warn, + "[broker: {}] " + "overwriting existing fetcher partition assignment " + "tp: {}, fetch_offset: {}, fetcher_epoch: {}, subscription epoch: {}", + _id, + tp, + existing_assignment.fetch_offset, + existing_assignment.fetcher_epoch, + existing_assignment.subscription_epoch); + } + + _partitions[tp.topic].insert_or_assign( + tp.partition, + partition_fetch_state( + tp.partition, offset, next_epoch(), subscription_epoch)); // in the case of fast leadership transfers, we may have a partition both - // being added and forgotten, in which case, make sure that it is only being - // added + // being added and forgotten, in which case, make sure that it is only + // being added auto forget_topic_iterator = _partitions_to_forget.find(tp.topic); if (forget_topic_iterator != _partitions_to_forget.end()) { - // topic is in the 'to forget map', now is the partition in the topic's - // map + // topic is in the 'to forget map', now is the partition in the + // topic's map auto& partitions_to_forget = forget_topic_iterator->second; auto forget_partition_iterator = partitions_to_forget.find( tp.partition); @@ -940,7 +885,8 @@ fetcher::unassign_partition(model::topic_partition_view tp_v) { auto fetch_offset = p_it->second.fetch_offset; partitions.erase(p_it); if (partitions.empty()) { - // if there are no partitions left for this topic, remove the topic + // if there are no partitions left for this topic, remove the + // topic _partitions.erase(it); } @@ -980,7 +926,8 @@ fetcher::do_list_offsets(list_offsets_request req) { } catch (const broker_error& e) { vlog( logger().warn, - "list_offsets request to broker {} failed with broker error: {}", + "list_offsets request to broker {} failed with broker error: " + "{}", _id, e); co_return e.error; @@ -994,6 +941,49 @@ fetcher::do_list_offsets(list_offsets_request req) { } } +std::optional> +fetcher::find_fetcher_state( + const model::topic& topic, model::partition_id partition) { + auto t_it = _partitions.find(topic); + if (t_it == _partitions.end()) { + return std::nullopt; + } + + auto& partition_assignments = t_it->second; + auto p_it = partition_assignments.find(partition); + if (p_it == partition_assignments.end()) { + return std::nullopt; + } + + return p_it->second; +} + +bool fetcher::is_consistent_fetcher_epoch( + const model::topic& topic, + model::partition_id partition_id, + const topic_partition_map& epochs) { + // not found in epochs -> inconsistent + // not found in assignments -> inconsistent + // epochs fetcher epoch != assignments fetcher epoch -> inconsistent + // epochs fetcher epoch == assignments fetch epoch -> consistent + + auto maybe_epoch_set = find_epoch_set(topic, partition_id, epochs); + + if (!maybe_epoch_set) { + return false; + } + auto epoch_set = *maybe_epoch_set; + + auto maybe_fetch_state = find_fetcher_state(topic, partition_id); + if (!maybe_fetch_state) { + return false; + } + + auto& fetch_state = maybe_fetch_state->get(); + + return fetch_state.fetcher_epoch == epoch_set.fetcher_epoch; +} + fmt::iterator fetch_session_state::format_to(fmt::iterator it) const { return fmt::format_to( it, diff --git a/src/v/kafka/client/direct_consumer/fetcher.h b/src/v/kafka/client/direct_consumer/fetcher.h index c2d6fd7bcc132..14231ebe136c0 100644 --- a/src/v/kafka/client/direct_consumer/fetcher.h +++ b/src/v/kafka/client/direct_consumer/fetcher.h @@ -27,6 +27,7 @@ #include +#include #include namespace kafka::client { @@ -165,7 +166,9 @@ class fetcher { * update the fetch offsets for the already assigned partitions. */ ss::future<> assign_partition( - model::topic_partition_view, std::optional); + model::topic_partition_view, + std::optional, + subscription_epoch); /** * Unassign partition from the fetcher, it will stop fetching data for the * partition and remove it from the fetcher state. After the unassignment @@ -185,15 +188,38 @@ class fetcher { void toggle_sessions(fetch_sessions_enabled); private: - using assignment_epoch = named_type; + using fetcher_epoch = named_type; struct partition_fetch_state { + partition_fetch_state() = delete; + partition_fetch_state( + model::partition_id partition_id, + std::optional fetch_offset, + fetcher_epoch fetcher_epoch, + subscription_epoch subscription_epoch) noexcept + : partition_id{partition_id} + , fetch_offset{fetch_offset} + , high_watermark{std::nullopt} + , current_leader_epoch{kafka::invalid_leader_epoch} + , fetcher_epoch{fetcher_epoch} + , incremental_include{true} + , subscription_epoch{subscription_epoch} {} + + partition_fetch_state(const partition_fetch_state&) = default; + partition_fetch_state(partition_fetch_state&&) = default; + partition_fetch_state& operator=(const partition_fetch_state&) + = default; + partition_fetch_state& operator=(partition_fetch_state&&) = default; + ~partition_fetch_state() = default; + model::partition_id partition_id; std::optional fetch_offset; std::optional high_watermark; - leader_epoch current_leader_epoch{kafka::invalid_leader_epoch}; - assignment_epoch assignment_epoch{0}; - bool incremental_include{false}; - + leader_epoch current_leader_epoch; + // version of assignment to this fetcher + fetcher_epoch fetcher_epoch; + bool incremental_include; + // direct_consumer's subscription epoch + subscription_epoch subscription_epoch; bool include_in_fetch_request() const { return fetch_offset.has_value(); } @@ -210,8 +236,29 @@ class fetcher { && to_forget.empty(); } }; + + struct epoch_set { + epoch_set() = delete; + + epoch_set( + fetcher_epoch fetcher_epoch, + subscription_epoch subscription_epoch) noexcept + : fetcher_epoch{fetcher_epoch} + , subscription_epoch{subscription_epoch} {} + + epoch_set(const epoch_set&) = default; + epoch_set(epoch_set&&) = default; + epoch_set& operator=(const epoch_set&) = default; + epoch_set& operator=(epoch_set&&) = default; + + ~epoch_set() = default; + + fetcher_epoch fetcher_epoch; + subscription_epoch subscription_epoch; + }; + struct partitions_with_epoch { - topic_partition_map assignment_epochs; + topic_partition_map snapshotted_epochs; chunked_vector partitions; }; struct fetch_response_content { @@ -221,10 +268,22 @@ class fetcher { kafka::fetch_session_id session_id{0}; }; - static std::optional find_assignment_epoch( + // given a topic and parition, fetch the set of epochs if they exist + // this is frequently used to check the assignment epoch of partition data + // witin a fetcher response + static std::optional find_epoch_set( const model::topic& topic, model::partition_id partition, - const topic_partition_map& epochs); + const topic_partition_map& epochs); + + std::optional> + find_fetcher_state( + const model::topic& topic, model::partition_id partition); + + bool is_consistent_fetcher_epoch( + const model::topic& topic, + model::partition_id partition_id, + const topic_partition_map& epochs); ss::future get_fetch_request_version() const; ss::future get_list_offsets_request_version() const; @@ -232,14 +291,14 @@ class fetcher { ss::future collect_partitions(); ss::future maybe_initialise_fetch_offsets( const chunked_vector&, - const topic_partition_map& epochs); + const topic_partition_map& epochs); ss::future make_fetch_request(const chunked_vector&); ss::future> process_fetch_response( fetch_response resp, - const topic_partition_map& epochs, + const topic_partition_map& epochs, const chunked_vector& partitions); /** * Returns false if the partition was not found or the fetch offset was @@ -247,18 +306,14 @@ class fetcher { * This indicates that the fetch response should be ignored. */ bool maybe_update_fetch_offset( - const model::topic&, - model::partition_id, - kafka::offset, - kafka::offset, - std::optional); + const model::topic&, model::partition_id, kafka::offset, kafka::offset); ss::future>> do_list_offsets(list_offsets_request); data_queue& queue(); prefix_logger& logger(); - assignment_epoch next_epoch() { return ++_epoch; } + fetcher_epoch next_epoch() { return ++_epoch; } void reset_partition_offset(model::topic_partition_view); @@ -270,7 +325,7 @@ class fetcher { ss::condition_variable _partitions_updated; ss::gate _gate; mutex _state_lock; - assignment_epoch _epoch{0}; + fetcher_epoch _epoch{0}; ss::abort_source _as; }; } // namespace kafka::client diff --git a/src/v/kafka/client/direct_consumer/tests/BUILD b/src/v/kafka/client/direct_consumer/tests/BUILD index 4eb14c11e1096..f03cedf8da2fb 100644 --- a/src/v/kafka/client/direct_consumer/tests/BUILD +++ b/src/v/kafka/client/direct_consumer/tests/BUILD @@ -28,6 +28,7 @@ redpanda_cc_gtest( "//src/v/kafka/client/direct_consumer", "//src/v/kafka/client/test:cluster_mock", "//src/v/kafka/protocol", + "//src/v/model", "//src/v/model/tests:random", "//src/v/test_utils:gtest", "@googletest//:gtest", diff --git a/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc b/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc index 05a2e8beb6d90..96ea3b76de6ea 100644 --- a/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc +++ b/src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc @@ -15,6 +15,7 @@ #include "kafka/client/test/cluster_mock.h" #include "kafka/client/types.h" #include "kafka/protocol/types.h" +#include "model/fundamental.h" #include "model/tests/random_batch.h" #include "test_utils/async.h" #include "test_utils/test.h" @@ -468,6 +469,85 @@ TEST_F(consumer_test_mock, TestLeadershipChange) { }); } +/** +Checks stale subscription filtering. +1. replicate [0,9], [10,19], [20,29] +2. assign ntp at 0 to direct_consumer +3. let dc fetch data +4. unassign ntp +5. assign ntp at 10 +6. check the first call to fetch_next is a empty fetch (was filtered) +7. drain all fetches, check we see only 20 records +*/ +TEST_F(consumer_test_mock, TestEpochFiltering) { + prepare_cluster(); + model::topic test_topic("panda-test"); + cluster_mock.add_topic(test_topic, 1, 3); + topic_partition_map> all_batches; + + // 1. replicate [0,9], [10,19], [20,29] + make_data_available(test_topic, 0, 10); + make_data_available(test_topic, 0, 10); + make_data_available(test_topic, 0, 10); + + auto client_cluster = create_client_cluster(); + client_cluster.start().get(); + direct_consumer consumer(client_cluster, direct_consumer::configuration{}); + consumer.start().get(); + auto deferred_stop = ss::defer([&] { + consumer.stop().get(); + client_cluster.stop().get(); + }); + + auto get_assignment = [test_topic](kafka::offset begin) { + chunked_vector assignment; + assignment.push_back({ + .topic = test_topic, + }); + // only partition 0 is assigned + assignment.back().partitions.push_back( + partition_assignment{ + .partition_id = model::partition_id(0), + .next_offset = kafka::offset(begin), + }); + return assignment; + }; + + // 2. assign ntp at 0 to direct_consumer + consumer.assign_partitions(get_assignment(kafka::offset{0})).get(); + + // 3. let dc fetch data + // TODO we need a way to force iterations to occur + ss::sleep(2s).get(); + + // 4&5. unassign & reassign with offset 10 instead + consumer.unassign_topics({test_topic}).get(); + consumer.assign_partitions(get_assignment(kafka::offset{10})).get(); + + // 6. check the first call to fetch_next is an empty fetch (was filtered) + auto filtered_fetch_result = consumer.fetch_next(5s).get(); + ASSERT_TRUE(filtered_fetch_result.has_value()); + auto filtered_fetch = std::move(filtered_fetch_result).value(); + int found_partition_count{0}; + + for (auto& topic_data : filtered_fetch) { + for (auto& partition_data : topic_data.partitions) { + std::ignore = partition_data; + ++found_partition_count; + } + } + ASSERT_EQ(found_partition_count, 0); + + // 7. drain all fetches, check we see only 20 records + RPTEST_REQUIRE_EVENTUALLY(10s, [&] { + return fetch_and_append_to_map(all_batches, consumer).then([&] { + auto& ntp_batches = all_batches[test_topic][model::partition_id(0)]; + return ntp_batches.size() == 20 + && ntp_batches.back().last_offset() == model::offset(29); + }); + }); +} + TEST_F(consumer_test_mock, TestOffsetResetPolicy) { prepare_cluster(); model::topic test_topic("panda-test");