From 19fc32847768913f7d8efea189c164a44058045d Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Tue, 5 Aug 2025 22:07:24 -0700 Subject: [PATCH] kafka/client/test/data_queue_test add chaos Adds a chaos test to data queue tests. --- .../kafka/client/direct_consumer/data_queue.h | 3 + src/v/kafka/client/test/BUILD | 1 + src/v/kafka/client/test/data_queue_test.cc | 79 +++++++++++++++++++ 3 files changed, 83 insertions(+) diff --git a/src/v/kafka/client/direct_consumer/data_queue.h b/src/v/kafka/client/direct_consumer/data_queue.h index ec157e547f8ba..3c539e242eb67 100644 --- a/src/v/kafka/client/direct_consumer/data_queue.h +++ b/src/v/kafka/client/direct_consumer/data_queue.h @@ -75,6 +75,9 @@ class data_queue { chunked_vector topics; size_t total_bytes{0}; }; + + fetches do_pop(); + size_t _max_count{10}; size_t _max_bytes{10_MiB}; diff --git a/src/v/kafka/client/test/BUILD b/src/v/kafka/client/test/BUILD index 817439b433788..bff0dcd73952a 100644 --- a/src/v/kafka/client/test/BUILD +++ b/src/v/kafka/client/test/BUILD @@ -440,6 +440,7 @@ redpanda_cc_gtest( "data_queue_test.cc", ], deps = [ + "//src/v/base", "//src/v/kafka/client/direct_consumer", "//src/v/model/tests:random", "//src/v/test_utils:gtest", diff --git a/src/v/kafka/client/test/data_queue_test.cc b/src/v/kafka/client/test/data_queue_test.cc index 95a6d8916beba..ede70c986bdd3 100644 --- a/src/v/kafka/client/test/data_queue_test.cc +++ b/src/v/kafka/client/test/data_queue_test.cc @@ -7,13 +7,20 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "base/vassert.h" #include "kafka/client/direct_consumer/data_queue.h" +#include "test_utils/async.h" #include "test_utils/test.h" #include +#include +#include + using namespace kafka::client; +namespace { + static constexpr size_t max_bytes = 1024; static constexpr size_t max_count = 10; @@ -29,6 +36,7 @@ make_data(int topic_count, int bytes_per_topic) { } return data; } +} // namespace TEST(DataQueueTest, TestIfCanInsertWorks) { data_queue queue(max_bytes, max_count); @@ -228,3 +236,74 @@ TEST(DataQueueTest, TestSizeAndCurrentBytesTracking) { ASSERT_EQ(queue.size(), 0); ASSERT_EQ(queue.current_bytes(), 0); } + +TEST_CORO(DataQueueTest, TestChaos) { + static constexpr size_t payload_size{128}; + static constexpr std::chrono::duration phase_runtime = std::chrono::seconds( + 5); + data_queue queue(max_bytes, max_count); + + bool should_push{true}; + bool should_pop{true}; + size_t push_counter{1}; + size_t pop_counter{1}; + + std::random_device r{}; + std::default_random_engine random_engine(r()); + std::uniform_int_distribution sleep_generator(1, 100); + + // s.t. one can be pushed to run faster than the other + uint push_delay_multiplier{1}; + uint pop_delay_multiplier{1}; + + // this is fine so long as the lambda itself lives longer than any spawned + // execution + // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) + auto push_fiber = [&] -> ss::future<> { + while (should_push) { + auto data = make_data(1, payload_size); + // sneak a sequence number into the payload + data[0].total_bytes = push_counter++; + co_await queue.push(std::move(data), payload_size); + co_await ss::sleep(std::chrono::milliseconds( + sleep_generator(random_engine) * push_delay_multiplier)); + } + co_return; + }; + + // this is fine so long as the lambda itself lives longer than any spawned + // execution + // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines) + auto pop_fiber = [&] -> ss::future<> { + while (should_pop) { + auto data = co_await queue.pop(std::chrono::milliseconds(1000)); + auto found_number = data.value().at(0).total_bytes; + vassert( + found_number == pop_counter++, "data from queue out of order"); + co_await ss::sleep(std::chrono::milliseconds( + sleep_generator(random_engine) * pop_delay_multiplier)); + } + co_return; + }; + + auto push_fiber_task = push_fiber(); + auto pop_fiber_task = pop_fiber(); + + co_await ss::sleep(phase_runtime); + + push_delay_multiplier = 2; + pop_delay_multiplier = 1; + + co_await ss::sleep(phase_runtime); + + push_delay_multiplier = 1; + pop_delay_multiplier = 2; + + co_await ss::sleep(phase_runtime); + + should_push = false; + co_await std::move(push_fiber_task); + + should_pop = false; + co_await std::move(pop_fiber_task); +}