Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/kafka/client/direct_consumer/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class data_queue {
chunked_vector<fetched_topic_data> topics;
size_t total_bytes{0};
};

fetches do_pop();

size_t _max_count{10};
size_t _max_bytes{10_MiB};

Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/client/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ redpanda_cc_gtest(
"data_queue_test.cc",
],
deps = [
"//src/v/base",
"//src/v/kafka/client/direct_consumer",
"//src/v/model/tests:random",
"//src/v/test_utils:gtest",
Expand Down
79 changes: 79 additions & 0 deletions src/v/kafka/client/test/data_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "base/vassert.h"
#include "kafka/client/direct_consumer/data_queue.h"
#include "test_utils/async.h"
#include "test_utils/test.h"

#include <gtest/gtest.h>

#include <chrono>
#include <random>

using namespace kafka::client;

namespace {

static constexpr size_t max_bytes = 1024;
static constexpr size_t max_count = 10;

Expand All @@ -29,6 +36,7 @@ make_data(int topic_count, int bytes_per_topic) {
}
return data;
}
} // namespace

TEST(DataQueueTest, TestIfCanInsertWorks) {
data_queue queue(max_bytes, max_count);
Expand Down Expand Up @@ -228,3 +236,74 @@ TEST(DataQueueTest, TestSizeAndCurrentBytesTracking) {
ASSERT_EQ(queue.size(), 0);
ASSERT_EQ(queue.current_bytes(), 0);
}

TEST_CORO(DataQueueTest, TestChaos) {
static constexpr size_t payload_size{128};
static constexpr std::chrono::duration phase_runtime = std::chrono::seconds(
5);
data_queue queue(max_bytes, max_count);

bool should_push{true};
bool should_pop{true};
size_t push_counter{1};
size_t pop_counter{1};

std::random_device r{};
std::default_random_engine random_engine(r());
std::uniform_int_distribution<uint> sleep_generator(1, 100);

// s.t. one can be pushed to run faster than the other
uint push_delay_multiplier{1};
uint pop_delay_multiplier{1};

// this is fine so long as the lambda itself lives longer than any spawned
// execution
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
auto push_fiber = [&] -> ss::future<> {
while (should_push) {
auto data = make_data(1, payload_size);
// sneak a sequence number into the payload
data[0].total_bytes = push_counter++;
co_await queue.push(std::move(data), payload_size);
co_await ss::sleep(std::chrono::milliseconds(
sleep_generator(random_engine) * push_delay_multiplier));
}
co_return;
};

// this is fine so long as the lambda itself lives longer than any spawned
// execution
// NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
auto pop_fiber = [&] -> ss::future<> {
while (should_pop) {
auto data = co_await queue.pop(std::chrono::milliseconds(1000));
auto found_number = data.value().at(0).total_bytes;
vassert(
found_number == pop_counter++, "data from queue out of order");
co_await ss::sleep(std::chrono::milliseconds(
sleep_generator(random_engine) * pop_delay_multiplier));
}
co_return;
};

auto push_fiber_task = push_fiber();
auto pop_fiber_task = pop_fiber();

co_await ss::sleep(phase_runtime);

push_delay_multiplier = 2;
pop_delay_multiplier = 1;

co_await ss::sleep(phase_runtime);

push_delay_multiplier = 1;
pop_delay_multiplier = 2;

co_await ss::sleep(phase_runtime);

should_push = false;
co_await std::move(push_fiber_task);

should_pop = false;
co_await std::move(pop_fiber_task);
}
Loading