Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/kafka/client/direct_consumer/api_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ struct source_partition_offsets {
kafka::offset last_stable_offset{-1};
// The timestamp that the fetch response was received by the client
ss::lowres_clock::time_point last_offset_update_timestamp{};

// check all except timestamp
bool operator==(const source_partition_offsets& other) const {
return log_start_offset == other.log_start_offset
&& high_watermark == other.high_watermark
&& last_stable_offset == other.last_stable_offset;
}
};

struct partition_offset {
Expand Down
190 changes: 152 additions & 38 deletions src/v/kafka/client/direct_consumer/direct_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "ssx/future-util.h"

#include <algorithm>
#include <chrono>
#include <functional>

namespace kafka::client {

Expand All @@ -37,29 +39,48 @@ direct_consumer::direct_consumer(

ss::future<fetches>
direct_consumer::fetch_next(std::chrono::milliseconds timeout) {
static constexpr auto jitter = std::chrono::milliseconds{1};
if (!_started) [[unlikely]] {
throw std::runtime_error("Direct consumer is not started");
}
auto holder = _gate.hold();

auto deadline = ss::lowres_clock::now() + timeout;

try {
auto maybe_response_to_filter = co_await _fetched_data_queue->pop(
timeout);
if (maybe_response_to_filter.has_error()) {
while (ss::lowres_clock::now() < deadline) {
auto timeout_remaining
= std::chrono::duration_cast<std::chrono::milliseconds>(
deadline - ss::lowres_clock::now() + jitter);

auto maybe_response_to_filter = co_await _fetched_data_queue->pop(
timeout_remaining);
if (maybe_response_to_filter.has_error()) {
co_return maybe_response_to_filter;
}

// the remainder is synchronous, no lock is needed
auto& response_to_filter = maybe_response_to_filter.value();
filter_stale_subscriptions(response_to_filter);
update_start_offsets(response_to_filter);

// if the filters have removed everything, there are no updates.
// rejoin the queue
if (response_to_filter.empty()) {
continue;
}

co_return maybe_response_to_filter;
}

// the remainder is synchronous, no lock is needed
auto& response_to_filter = maybe_response_to_filter.value();
filter_stale_subscriptions(response_to_filter);
co_return maybe_response_to_filter;

} catch (ss::condition_variable_timed_out&) {
co_return chunked_vector<fetched_topic_data>{};
}
co_return chunked_vector<fetched_topic_data>{};
}

void direct_consumer::filter_stale_subscriptions(
chunked_vector<fetched_topic_data>& responses_to_filter) {
chunked_vector<fetched_topic_data>& responses_to_filter) const {
// for each topic, remove stale partitions
for (auto& topic_data : responses_to_filter) {
auto& partition_data = topic_data.partitions;
Expand Down Expand Up @@ -103,6 +124,98 @@ void direct_consumer::filter_stale_subscriptions(
responses_to_filter.end() - non_empty_subsegment.size());
}

void direct_consumer::update_start_offsets(
chunked_vector<fetched_topic_data>& fetched_data) {
auto empty_topic_subsegment = std::ranges::partition(
fetched_data, [this](auto& topic_data) {
// for (auto& topic_data : fetched_data) {
// remove anything with no updates: it must have no data and have no
// offset updates
auto non_updated_subsegment = std::ranges::partition(
topic_data.partitions, [this, &topic_data](auto& partition_data) {
auto maybe_subscription = find_subscription(
topic_data.topic, partition_data.partition_id);
vassert(
maybe_subscription.has_value(),
"invoke only after filtering, all subscriptions should be "
"consistent");
auto& subscription = maybe_subscription->get();
source_partition_offsets spo{
.log_start_offset = partition_data.start_offset,
.high_watermark = partition_data.high_watermark,
.last_stable_offset = partition_data.last_stable_offset,
.last_offset_update_timestamp = ss::lowres_clock::now()};

// not an update, screen it out
if (
spo == subscription.last_known_source_offsets
&& partition_data.data.size() == 0) {
subscription.last_known_source_offsets
.last_offset_update_timestamp
= ss::lowres_clock::now();
return false;
}

if (
subscription.last_known_source_offsets.log_start_offset
> spo.log_start_offset) {
vlog(
_cluster->logger().error,
"log start offset should never move backward, current: "
"{}, "
"found: {}",
subscription.last_known_source_offsets.log_start_offset,
spo.log_start_offset);
}
if (
subscription.last_known_source_offsets.high_watermark
> spo.high_watermark) {
vlog(
_cluster->logger().warn,
"high watermark should not normally move backward, "
"current: "
"{}, found: {}",
subscription.last_known_source_offsets.high_watermark,
spo.high_watermark);
}
if (
subscription.last_known_source_offsets.last_stable_offset
> spo.last_stable_offset) {
vlog(
_cluster->logger().error,
"last known stable should never move backward, "
"current: {}, found: {}",
subscription.last_known_source_offsets.last_stable_offset,
spo.last_stable_offset);
}
subscription.last_known_source_offsets = spo;
return true;
});

// no need to update sizes, if there were data, it would not be
// filtered out
topic_data.partitions.erase_to_end(non_updated_subsegment.begin());

// gather the empty topics to the end s.t. they can be filtered out
return !topic_data.partitions.empty();
});

fetched_data.erase_to_end(empty_topic_subsegment.begin());
}

void direct_consumer::filter_empty(
chunked_vector<fetched_topic_data>& responses_to_filter) const {
// for each topic, remove stale partitions
auto empty_subsegment = std::ranges::partition(
responses_to_filter, [](fetched_topic_data& topic_data) {
return topic_data.total_bytes != 0;
});

auto erase_iterator = responses_to_filter.end() - empty_subsegment.size();

responses_to_filter.erase_to_end(erase_iterator);
}

void direct_consumer::update_configuration(configuration cfg) {
vlog(
_cluster->logger().info,
Expand All @@ -118,15 +231,10 @@ void direct_consumer::update_configuration(configuration cfg) {

std::optional<source_partition_offsets>
direct_consumer::get_source_offsets(model::topic_partition_view tp) const {
auto it = _subscriptions.find(tp.topic);
if (it == _subscriptions.end()) {
return std::nullopt;
}
auto p_it = it->second.find(tp.partition);
if (p_it == it->second.end()) {
return std::nullopt;
}
return p_it->second.last_known_source_offsets;
return find_subscription(tp.topic, tp.partition)
.transform([](std::reference_wrapper<const subscription> sub) {
return sub.get().last_known_source_offsets;
});
}

ss::future<> direct_consumer::update_fetchers(
Expand Down Expand Up @@ -231,7 +339,24 @@ fetcher& direct_consumer::get_fetcher(model::node_id id) {
return *it->second;
}

std::optional<subscription_epoch> direct_consumer::find_subscription_epoch(
std::optional<std::reference_wrapper<const direct_consumer::subscription>>
direct_consumer::find_subscription(
const model::topic& topic, model::partition_id partition_id) const {
auto t_it = _subscriptions.find(topic);
if (t_it == _subscriptions.end()) {
return std::nullopt;
}

auto& p_map = t_it->second;
auto p_it = p_map.find(partition_id);
if (p_it == p_map.end()) {
return std::nullopt;
}
return p_it->second;
}

std::optional<std::reference_wrapper<direct_consumer::subscription>>
direct_consumer::find_subscription(
const model::topic& topic, model::partition_id partition_id) {
auto t_it = _subscriptions.find(topic);
if (t_it == _subscriptions.end()) {
Expand All @@ -243,7 +368,14 @@ std::optional<subscription_epoch> direct_consumer::find_subscription_epoch(
if (p_it == p_map.end()) {
return std::nullopt;
}
return p_it->second.subscription_epoch;
return p_it->second;
}

std::optional<subscription_epoch> direct_consumer::find_subscription_epoch(
const model::topic& topic, model::partition_id partition_id) const {
return find_subscription(topic, partition_id).transform([](auto sub) {
return sub.get().subscription_epoch;
});
}

ss::future<> direct_consumer::start() {
Expand Down Expand Up @@ -350,24 +482,6 @@ void direct_consumer::on_metadata_update(const metadata_update&) {
ssx::spawn_with_gate(_gate, [this] { return handle_metadata_update(); });
}

void direct_consumer::maybe_update_source_partition_offsets(
model::topic_partition_view tp, source_partition_offsets offsets) {
auto it = _subscriptions.find(tp.topic);
if (it == _subscriptions.end()) {
return;
}
auto p_it = it->second.find(tp.partition);
if (p_it == it->second.end()) {
return;
}
auto& sub = p_it->second;
if (
offsets.last_offset_update_timestamp
> sub.last_known_source_offsets.last_offset_update_timestamp) {
sub.last_known_source_offsets = offsets;
}
}

direct_consumer::~direct_consumer() = default;

std::ostream&
Expand Down
16 changes: 12 additions & 4 deletions src/v/kafka/client/direct_consumer/direct_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,22 @@ class direct_consumer {

fetcher& get_fetcher(model::node_id id);

std::optional<subscription_epoch> find_subscription_epoch(
std::optional<std::reference_wrapper<const subscription>> find_subscription(
const model::topic& topic, model::partition_id partition_id) const;

std::optional<std::reference_wrapper<subscription>> find_subscription(
const model::topic& topic, model::partition_id partition_id);

std::optional<subscription_epoch> find_subscription_epoch(
const model::topic& topic, model::partition_id partition_id) const;

void filter_stale_subscriptions(
chunked_vector<fetched_topic_data>& responses_to_filter);
chunked_vector<fetched_topic_data>& responses_to_filter) const;

void update_start_offsets(chunked_vector<fetched_topic_data>& fetched_data);

void maybe_update_source_partition_offsets(
model::topic_partition_view tp, source_partition_offsets offsets);
void
filter_empty(chunked_vector<fetched_topic_data>& responses_to_filter) const;

cluster* _cluster;

Expand Down
Loading
Loading