From a15d50f798d70e20b119d54d138196914bdc4d90 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 21 Apr 2026 15:58:35 -0400 Subject: [PATCH] kafka/server: cap fetch memory allocation at max message size limit Use kafka_max_message_size_upper_limit_bytes to clamp max_bytes and max_batch_size in fetch memory allocation instead of the full shard fetch memory budget. This prevents a single partition with an inflated max.message.bytes (e.g. UINT32_MAX) from monopolizing the entire shard's fetch memory, which can starve other partitions and cause fetch timeouts. (cherry picked from commit 2b2c4b34ac45ba98ae5d9bf621d6b8f0a83d0415) --- src/v/kafka/server/fetch_memory_units.cc | 19 +++++++---- src/v/kafka/server/fetch_memory_units.h | 5 ++- src/v/kafka/server/server.cc | 3 +- src/v/kafka/server/tests/BUILD | 1 + .../server/tests/fetch_memory_units_test.cc | 33 ++++++++++++++++++- 5 files changed, 52 insertions(+), 9 deletions(-) diff --git a/src/v/kafka/server/fetch_memory_units.cc b/src/v/kafka/server/fetch_memory_units.cc index bb0de5a3c2f91..f0051ebd5fe9c 100644 --- a/src/v/kafka/server/fetch_memory_units.cc +++ b/src/v/kafka/server/fetch_memory_units.cc @@ -22,10 +22,12 @@ namespace kafka { fetch_memory_units_manager::fetch_memory_units_manager( ssx::semaphore& kafka_units, ssx::semaphore& fetch_units, - local_instance_fn&& local_fn) + local_instance_fn&& local_fn, + config::binding>&& max_message_size) : _kafka_units(kafka_units) , _fetch_units(fetch_units) , _max_fetch_units(fetch_units.current()) + , _max_message_size(std::move(max_message_size)) , _release_units_timer([this] { release_all_units_to_semaphore(); }) , _local_instance_fn(std::move(local_fn)) { _release_units_timer.arm_periodic(max_release_period); @@ -90,18 +92,23 @@ fetch_memory_units fetch_memory_units_manager::allocate_memory_units( max_bytes = _max_fetch_units; } - if (max_batch_size > _max_fetch_units) { + const auto& configured_max = _max_message_size(); + const size_t max_message_size = configured_max.has_value() + ? static_cast(*configured_max) + : _max_fetch_units; + + if (max_batch_size > max_message_size) { thread_local static ss::logger::rate_limit rate_limit(rate); klog.log( ss::log_level::error, rate_limit, - "{}: max_batch_size({}) exceeds available fetch memory ({}). " + "{}: max_batch_size({}) exceeds max message size ({}). " "Consider reducing `message.max.bytes` for the topic. Setting " - "max_batch_size to available fetch memory.", + "max_batch_size to max message size.", ktp, max_batch_size, - _max_fetch_units); - max_batch_size = _max_fetch_units; + max_message_size); + max_batch_size = max_message_size; } const size_t available_units = std::min( diff --git a/src/v/kafka/server/fetch_memory_units.h b/src/v/kafka/server/fetch_memory_units.h index bbee520e247b2..004688d693c4b 100644 --- a/src/v/kafka/server/fetch_memory_units.h +++ b/src/v/kafka/server/fetch_memory_units.h @@ -11,6 +11,7 @@ #pragma once #include "base/seastarx.h" #include "base/units.h" +#include "config/property.h" #include "container/chunked_hash_map.h" #include "model/ktp.h" #include "ssx/semaphore.h" @@ -47,7 +48,8 @@ class fetch_memory_units_manager { fetch_memory_units_manager( ssx::semaphore& kafka_units, ssx::semaphore& fetch_units, - local_instance_fn&& local_fn); + local_instance_fn&& local_fn, + config::binding>&&); ss::future<> stop(); @@ -114,6 +116,7 @@ class fetch_memory_units_manager { ssx::semaphore& _kafka_units; ssx::semaphore& _fetch_units; size_t _max_fetch_units; + config::binding> _max_message_size; ss::timer<> _release_units_timer; // Collected units are aggregated together by shard in order to ensure there diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index c3296735fddc5..46aebb3aa729b 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -215,7 +215,8 @@ server::server( memory_fetch_sem(), [this] -> fetch_memory_units_manager& { return container().local().fetch_units_manager(); - }) + }, + config::shard_local_cfg().kafka_max_message_size_upper_limit_bytes.bind()) , _probe(std::make_unique()) , _sasl_probe(std::make_unique()) , _read_dist_probe(std::make_unique()) diff --git a/src/v/kafka/server/tests/BUILD b/src/v/kafka/server/tests/BUILD index bed380900f374..8eb75262ed4c9 100644 --- a/src/v/kafka/server/tests/BUILD +++ b/src/v/kafka/server/tests/BUILD @@ -219,6 +219,7 @@ redpanda_cc_gtest( cpu = 2, deps = [ "//src/v/base", + "//src/v/config", "//src/v/kafka/server", "//src/v/ssx:semaphore", "//src/v/test_utils:gtest", diff --git a/src/v/kafka/server/tests/fetch_memory_units_test.cc b/src/v/kafka/server/tests/fetch_memory_units_test.cc index 0ccadb2da0539..4b7671dce291c 100644 --- a/src/v/kafka/server/tests/fetch_memory_units_test.cc +++ b/src/v/kafka/server/tests/fetch_memory_units_test.cc @@ -9,6 +9,9 @@ * by the Apache License, Version 2.0 */ +#include "config/configuration.h" +#include "config/mock_property.h" +#include "config/property.h" #include "kafka/server/fetch_memory_units.h" #include "ssx/semaphore.h" #include "test_utils/test.h" @@ -41,6 +44,7 @@ class fetch_memory_units_test_fixture : public seastar_test { ss::future<> SetUpAsync() override { co_await _kafka_sem.start(100_MiB, ss::sstring("kafka_sem")); co_await _fetch_sem.start(50_MiB, ss::sstring("fetch_sem")); + co_await _max_message_size.start(std::nullopt); co_await _manager.start( ss::sharded_parameter( [this] { return std::reference_wrapper(_kafka_sem.local()); }), @@ -48,11 +52,14 @@ class fetch_memory_units_test_fixture : public seastar_test { [this] { return std::reference_wrapper(_fetch_sem.local()); }), [this] -> kafka::fetch_memory_units_manager& { return _manager.local(); - }); + }, + ss::sharded_parameter( + [this] { return _max_message_size.local().bind(); })); } ss::future<> TearDownAsync() override { co_await _manager.stop(); + co_await _max_message_size.stop(); co_await _kafka_sem.stop(); co_await _fetch_sem.stop(); } @@ -83,11 +90,18 @@ class fetch_memory_units_test_fixture : public seastar_test { [target_units](auto& fs) { set_units(fs, target_units); }); } + ss::future<> set_max_message_size(std::optional s) { + return _max_message_size.invoke_on_all( + [s](auto& ms) { ms.update(std::optional{s}); }); + } + private: ss::sharded _kafka_sem; ss::sharded _fetch_sem; ss::sharded _manager; ss::sharded _sem; + ss::sharded>> + _max_message_size; }; TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) { @@ -149,6 +163,23 @@ TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) { EXPECT_EQ(co_await other_kafka_sem_avail(), max_release_size); } +TEST_F_CORO(fetch_memory_units_test_fixture, test_max_units) { + kafka::fetch_memory_units_manager& mgr = local_manager(); + + co_await set_kafka_units(1000); + co_await set_fetch_units(1000); + co_await set_max_message_size(10); + + auto units = mgr.allocate_memory_units(model::ktp{}, 1, 100, 1, false); + EXPECT_EQ(units.num_units(), 10); + units = mgr.allocate_memory_units(model::ktp{}, 1, 100, 1, true); + EXPECT_EQ(units.num_units(), 10); + + // `max_bytes` should still be reserved if there are enough units. + units = mgr.allocate_memory_units(model::ktp{}, 100, 1, 1, false); + EXPECT_EQ(units.num_units(), 100); +} + TEST_F_CORO(fetch_memory_units_test_fixture, test_adjust_units) { kafka::fetch_memory_units_manager& mgr = local_manager();