Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/v/kafka/client/direct_consumer/direct_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ direct_consumer::update_fetchers(topic_partition_map<subscription> removals) {
co_return;
}
auto holder = _gate.hold();
auto subscription_lock_holder = co_await _subscriptions_lock.get_units();
/**
* Unassign partitions from fetchers that are no longer needed.
*/
Expand Down Expand Up @@ -149,7 +150,12 @@ ss::future<> direct_consumer::start() {
ss::future<> direct_consumer::stop() {
_cluster->unregister_metadata_cb(_metadata_callback_id);
_fetched_data_queue->stop();
co_await _gate.close();

// close the gate and break the lock together, no need to drain all
// operations
auto f = _gate.close();
_subscriptions_lock.broken();
co_await std::move(f);

co_await ss::parallel_for_each(
_broker_fetchers, [](auto& pair) { return pair.second->stop(); });
Expand Down
4 changes: 4 additions & 0 deletions src/v/kafka/client/direct_consumer/direct_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "container/chunked_hash_map.h"
#include "kafka/client/cluster.h"
#include "kafka/client/direct_consumer/api_types.h"
#include "utils/mutex.h"

namespace kafka {
struct metadata_response_data;
Expand Down Expand Up @@ -143,6 +144,9 @@ class direct_consumer {
configuration _config;

topic_partition_map<subscription> _subscriptions;
// serialize updates to _subscriptions
mutex _subscriptions_lock{"direct_consumer::_subscriptions_lock"};

chunked_hash_map<model::node_id, std::unique_ptr<fetcher>> _broker_fetchers;
std::unique_ptr<data_queue> _fetched_data_queue;
ss::condition_variable _data_available;
Expand Down
49 changes: 43 additions & 6 deletions src/v/kafka/client/direct_consumer/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,43 @@ bool fetcher::maybe_update_fetch_offset(
model::partition_id partition_id,
kafka::offset last_received,
assignment_epoch assignment_epoch) {
// lets see if this is running
vlog(
logger().info,
"[broker: {}] {}/{} maybe_update_fetch_offset assignment_epoch: {}",
_id,
topic,
partition_id,
assignment_epoch);

auto t_it = _partitions.find(topic);
// topic not found
if (t_it == _partitions.end()) {
vlog(
logger().info,
"[broker: {}] {}/{} topic not found",
_id,
topic,
partition_id);
return false;
}

auto p_it = t_it->second.find(partition_id);
// partition not found
if (p_it == t_it->second.end()) {
vlog(
logger().info,
"[broker: {}] {}/{} partition not found",
_id,
topic,
partition_id);
return false;
}

// assignment mismatch
if (p_it->second.assignment_epoch != assignment_epoch) {
vlog(
logger().trace,
logger().info,
"[broker: {}] Ignoring {}/{} reply, assignment epoch changed, "
"request epoch: {}, current epoch: {}",
_id,
Expand All @@ -151,8 +177,9 @@ bool fetcher::maybe_update_fetch_offset(
return false;
}

// success
vlog(
logger().trace,
logger().info,
"[broker: {}] Updating {}/{} fetch offset to {}",
_id,
topic,
Expand Down Expand Up @@ -345,12 +372,18 @@ fetcher::process_fetch_response(
part_data.data = co_await reader_to_chunked_vector(
std::move(part_response.records.value()));

maybe_update_fetch_offset(
const bool was_updated = maybe_update_fetch_offset(
topic_data.topic,
part_data.partition_id,
model::offset_cast(part_data.data.back().last_offset()),
find_assignment_epoch(
topic_data.topic, part_data.partition_id, epochs));

// whether by epoch change or missing partition, do not put this
// data on the queue
if (!was_updated) {
continue;
}
}
topic_data.partitions.push_back(std::move(part_data));
}
Expand Down Expand Up @@ -472,6 +505,7 @@ ss::future<kafka::error_code> fetcher::maybe_initialise_fetch_offsets(
partition_offset.offset,
request_epoch,
p_it->second.assignment_epoch);
// TODO continue?;
}
vlog(
logger().info,
Expand Down Expand Up @@ -541,14 +575,17 @@ fetcher::unassign_partition(model::topic_partition_view tp_v) {
// partition not found, nothing to unassign
co_return std::nullopt;
}
auto fetch_offset = p_it->second.fetch_offset;

vlog(
logger().debug,
"[broker: {}] Removing partition: {} assignment",
"[broker: {}] Removing partition: {} assignment with offset: {}",
_id,
tp_v);
tp_v,
fetch_offset);

// TODO: for fetch sessions we will need to mark partitions to deletion so
// they can be removed from the fetch session
auto fetch_offset = p_it->second.fetch_offset;
partitions.erase(p_it);
if (partitions.empty()) {
// if there are no partitions left for this topic, remove the topic
Expand Down
8 changes: 8 additions & 0 deletions src/v/kafka/client/direct_consumer/verifier/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ ss::future<> consumer_runner::do_fetch() {
_total_records += totals.count;
auto last_fetched_offset = model::offset_cast(
fetched_partition_data.data.back().last_offset());

vlog(
v_logger.trace,
"fetched ntp: {}/{}, offset: {}",
fetched_topic_data.topic,
fetched_partition_data.partition_id,
last_fetched_offset);

if (last_fetched_offset <= partition_stats.last_fetched_offset) {
_non_monotonic_fetches++;
vlog(
Expand Down
88 changes: 77 additions & 11 deletions tests/rptest/direct_consumer_tests/direct_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,80 @@

from typing import Any

import time
import random
import threading
from typing import Callable

from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from ducktape.utils.util import wait_until
from ducktape.tests.test import TestContext
from ducktape.errors import TimeoutError

from rptest.services.direct_consumer_verifier import (
DirectConsumerVerifier, CreateDirectConsumerRequest, BrokerAddress,
DirectConsumerConfiguration, OffsetResetPolicy, IsolationLevel,
AssignPartitionsRequest, TopicAssignment, PartitionAssignment,
GetConsumerStateRequest)
from rptest.utils.node_operations import FailureInjectorBackgroundThread
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest


class LoopThread(threading.Thread):
"""Run loop func until graceful cancellation is requested"""
def _loop(self, *args, **kwargs):
while not self.stopped():
self._loop_func(*args, **kwargs)

def __init__(self, *args, loop_func: Callable[[], None], **kwargs):
super(LoopThread, self).__init__(*args, target=self._loop, **kwargs)
self._stop_event = threading.Event()
self._loop_func = loop_func

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()


#TODO: This test must be enabled once the direct consumer verifier support
# is added to vtools.
class DirectConsumerVerifierTest(RedpandaTest):
def __init__(self, test_context: TestContext, **kwargs: Any):
super().__init__(test_context, **kwargs)

@cluster(num_nodes=4)
def shuffle_one_leader(self, topic_spec: TopicSpec) -> None:
'''randomly chooses a partition in the given topic spec and moves its leader one replica over'''
namespace = "kafka"
topic_name = topic_spec.name
partition_number = random.choice(range(topic_spec.partition_count))
admin = Admin(self.redpanda)
# logs to debug
transfer_succeeded = admin.transfer_leadership_to(
namespace="kafka",
topic=topic_spec.name,
partition=random.randrange(0, topic_spec.partition_count))
self.redpanda.logger.debug(
f"transfer of ntp {namespace}/{topic_name}/{partition_number} success? {transfer_succeeded}"
)

def create_troublemaker_thread(self,
topic_spec: TopicSpec,
thread_name="stream_thread") -> LoopThread:
'''creates a background thread to drive paritition leadership transfers'''
thread = LoopThread(name=thread_name,
loop_func=self.shuffle_one_leader,
args=(topic_spec, ))
return thread

@cluster(num_nodes=5)
def test_basic_consuming_from_topic(self):
topic_name = "test-topic"
msg_count = 200000
msg_count = 2000000
msg_size = 128
client_id = "test-consumer"

Expand All @@ -42,15 +92,22 @@ def test_basic_consuming_from_topic(self):

self.client().create_topic(topic_spec)

KgoVerifierProducer.oneshot(self.test_context,
self.redpanda,
topic_name,
msg_size=msg_size,
msg_count=msg_count)
producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic_name,
msg_size=msg_size,
msg_count=msg_count)
producer.start()

verifier = DirectConsumerVerifier(self.test_context, log_level="DEBUG")
verifier.start()

troublemaker = self.create_troublemaker_thread(topic_spec=topic_spec)
troublemaker.start()

#failure_injector = FailureInjectorBackgroundThread(self.redpanda, self.logger, max_inter_failure_time=10)
#failure_injector.start()

try:
# check if the verifier is alive
verifier.status()
Expand Down Expand Up @@ -92,22 +149,31 @@ def test_basic_consuming_from_topic(self):
state_request = GetConsumerStateRequest(
client_id=client_id, include_partition_states=True)

def check_consumption():
def check_consumption() -> bool:
state = verifier.get_consumer_state(state_request)
self.logger.info(
f"Consumer state: consumed {state.total_consumed_messages} messages"
self.logger.debug(
f"Expecting: {msg_count} Consumer state: consumed {state.total_consumed_messages} messages"
)
return state.total_consumed_messages >= msg_count

wait_until(check_consumption, timeout_sec=60, backoff_sec=2)
wait_until(check_consumption, timeout_sec=600, backoff_sec=2)

final_state = verifier.get_consumer_state(state_request)

# assertions
assert final_state.total_consumed_messages == msg_count, \
f"Expected {msg_count} messages, got {final_state.total_consumed_messages}"

assert int(final_state.non_monotonic_fetches) == 0, \
f"Non-monotomic fetches found, number of nm fetches: {final_state.non_monotonic_fetches}"

self.logger.info(
f"Successfully consumed {final_state.total_consumed_messages} messages"
)

finally:
#failure_injector.stop()
troublemaker.stop()
producer.stop()
verifier.stop()
producer.wait(timeout_sec=600)
Loading