Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v/kafka/client/direct_consumer/api_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

#include <seastar/util/bool_class.hh>
namespace kafka::client {

using subscription_epoch = named_type<uint64_t, struct subscription_epoch_tag>;

enum class offset_reset_policy : int8_t {
// reset to the earliest offset
earliest,
Expand Down Expand Up @@ -63,6 +66,8 @@ struct fetched_partition_data {
chunked_vector<model::record_batch> data;
kafka::error_code error = kafka::error_code::none;
std::optional<chunked_vector<aborted_transaction>> aborted_transactions;
subscription_epoch subscription_epoch;
size_t size_bytes;
};

struct fetched_topic_data {
Expand Down
109 changes: 100 additions & 9 deletions src/v/kafka/client/direct_consumer/direct_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
*/
#include "kafka/client/direct_consumer/direct_consumer.h"

#include "kafka/client/direct_consumer/api_types.h"
#include "kafka/client/direct_consumer/data_queue.h"
#include "kafka/client/direct_consumer/fetcher.h"
#include "model/validation.h"
#include "ssx/future-util.h"

#include <seastar/util/defer.hh>

#include <algorithm>
#include <iterator>

namespace kafka::client {

direct_consumer::direct_consumer(cluster& cluster, configuration cfg)
Expand All @@ -31,12 +37,75 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) {
}
auto holder = _gate.hold();
try {
co_return co_await _fetched_data_queue->pop(timeout);
auto maybe_response_to_filter = co_await _fetched_data_queue->pop(
timeout);
if (maybe_response_to_filter.has_error()) {
co_return maybe_response_to_filter;
}

// the remainder is synchronous, no lock is needed

auto& response_to_filter = maybe_response_to_filter.value();
filter_stale_subscriptions(response_to_filter);
co_return maybe_response_to_filter;

} catch (ss::condition_variable_timed_out&) {
co_return chunked_vector<fetched_topic_data>{};
}
}

void direct_consumer::filter_stale_subscriptions(
chunked_vector<fetched_topic_data>& responses_to_filter) {
// for each topic, remove stale partitions
for (auto& topic_data : responses_to_filter) {
auto& partition_data = topic_data.partitions;

auto non_stale_subsegment = std::ranges::partition(
partition_data,
[this, &topic_data](const fetched_partition_data& data) mutable {
// decrement a stale partition's contribution to the topic size
const auto partition_size = data.size_bytes;
auto decrement_holder = ss::defer([&topic_data, partition_size] {
topic_data.total_bytes -= partition_size;
});

auto maybe_current_subscription_epoch = find_subscription_epoch(
topic_data.topic, data.partition_id);

vlog(
_cluster->logger().debug,
"current subscription epoch: {}, found request epoch: {}",
maybe_current_subscription_epoch,
data.subscription_epoch);

// no longer assigned
if (!maybe_current_subscription_epoch) {
return false;
}
bool is_not_stale = data.subscription_epoch
== *maybe_current_subscription_epoch;
if (is_not_stale) {
decrement_holder.cancel();
}
return is_not_stale;
});

auto erase_iterator = partition_data.end()
- non_stale_subsegment.size();

partition_data.erase_to_end(erase_iterator);
}

// remove newly empty topics
auto non_empty_subsegment = std::ranges::partition(
responses_to_filter, [](const fetched_topic_data& topic_data) {
return !topic_data.partitions.empty();
});

responses_to_filter.erase_to_end(
responses_to_filter.end() - non_empty_subsegment.size());
}

void direct_consumer::update_configuration(configuration cfg) {
vlog(
_cluster->logger().info,
Expand Down Expand Up @@ -103,13 +172,17 @@ ss::future<> direct_consumer::update_fetchers(
sub.current_fetcher = *leader_id;
auto& new_fetcher = get_fetcher(*leader_id);
co_await new_fetcher.assign_partition(
model::topic_partition_view(topic, p_id), sub.fetch_offset);
model::topic_partition_view(topic, p_id),
sub.fetch_offset,
sub.subscription_epoch);

} else if (sub.fetch_offset) {
// If the fetch offset is set, update it
auto& current = get_fetcher(*sub.current_fetcher);
co_await current.assign_partition(
model::topic_partition_view(topic, p_id), sub.fetch_offset);
model::topic_partition_view(topic, p_id),
sub.fetch_offset,
sub.subscription_epoch);
}
sub.fetch_offset.reset();
}
Expand Down Expand Up @@ -148,6 +221,21 @@ fetcher& direct_consumer::get_fetcher(model::node_id id) {
return *it->second;
}

std::optional<subscription_epoch> direct_consumer::find_subscription_epoch(
const model::topic& topic, model::partition_id partition_id) {
auto t_it = _subscriptions.find(topic);
if (t_it == _subscriptions.end()) {
return std::nullopt;
}

auto& p_map = t_it->second;
auto p_it = p_map.find(partition_id);
if (p_it == p_map.end()) {
return std::nullopt;
}
return p_it->second.subscription_epoch;
}

ss::future<> direct_consumer::start() {
if (_started) {
co_return;
Expand Down Expand Up @@ -182,14 +270,17 @@ direct_consumer::assign_partitions(chunked_vector<topic_assignment> topics) {
"Invalid topic name: {}, error: {}", t.topic, ec.message()));
}
for (auto& p : t.partitions) {
auto epoch_to_assign = ++epoch;
vlog(
_cluster->logger().trace,
"Assigning partition: {}/{} with offset: {}",
"Assigning partition: {}/{} with offset: {} and epoch {}",
t.topic,
p.partition_id,
p.next_offset);
auto& sub = _subscriptions[t.topic][p.partition_id];
sub.fetch_offset = p.next_offset;
p.next_offset,
epoch_to_assign);
_subscriptions[t.topic].insert_or_assign(
p.partition_id,
subscription{std::nullopt, p.next_offset, epoch_to_assign});
}
}
co_await update_fetchers(std::move(lock_holder));
Expand All @@ -207,7 +298,7 @@ direct_consumer::unassign_topics(chunked_vector<model::topic> topics) {
}
removals[topic].reserve(state_it->second.size());
for (const auto& [p_id, sub] : state_it->second) {
removals[topic][p_id] = sub;
removals[topic].insert_or_assign(p_id, sub);
}
_subscriptions.erase(topic);
}
Expand All @@ -227,7 +318,7 @@ ss::future<> direct_consumer::unassign_partitions(
auto p_it = state_it->second.find(tp.partition);

if (p_it != state_it->second.end()) {
removals[tp.topic][tp.partition] = p_it->second;
removals[tp.topic].insert_or_assign(tp.partition, p_it->second);
state_it->second.erase(p_it);
}
if (state_it->second.empty()) {
Expand Down
26 changes: 26 additions & 0 deletions src/v/kafka/client/direct_consumer/direct_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "container/chunked_hash_map.h"
#include "kafka/client/cluster.h"
#include "kafka/client/direct_consumer/api_types.h"
#include "model/fundamental.h"

namespace kafka {
struct metadata_response_data;
Expand Down Expand Up @@ -125,8 +126,24 @@ class direct_consumer {

private:
struct subscription {
subscription() = delete;
subscription(
std::optional<model::node_id> current_fetcher,
std::optional<kafka::offset> fetch_offset,
subscription_epoch subscription_epoch) noexcept
: current_fetcher{current_fetcher}
, fetch_offset{fetch_offset}
, subscription_epoch{subscription_epoch} {}

subscription(const subscription&) = default;
subscription(subscription&&) = default;
subscription& operator=(const subscription&) = default;
subscription& operator=(subscription&&) = default;
~subscription() = default;

std::optional<model::node_id> current_fetcher;
std::optional<kafka::offset> fetch_offset;
subscription_epoch subscription_epoch;
};
friend class fetcher;
void on_metadata_update(const metadata_response_data&);
Expand All @@ -138,6 +155,12 @@ class direct_consumer {

fetcher& get_fetcher(model::node_id id);

std::optional<subscription_epoch> find_subscription_epoch(
const model::topic& topic, model::partition_id partition_id);

void filter_stale_subscriptions(
chunked_vector<fetched_topic_data>& responses_to_filter);

cluster* _cluster;

offset_reset_policy _reset_policy
Expand All @@ -152,6 +175,9 @@ class direct_consumer {
std::unique_ptr<data_queue> _fetched_data_queue;
ss::condition_variable _data_available;

// tracks the version of a subscribed ntp
subscription_epoch epoch{0};

cluster::callback_id _metadata_callback_id;
bool _started = false;
ss::gate _gate;
Expand Down
Loading
Loading