Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/v/kafka/server/fetch_memory_units.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ namespace kafka {
fetch_memory_units_manager::fetch_memory_units_manager(
ssx::semaphore& kafka_units,
ssx::semaphore& fetch_units,
local_instance_fn&& local_fn)
local_instance_fn&& local_fn,
config::binding<std::optional<int32_t>>&& max_message_size)
: _kafka_units(kafka_units)
, _fetch_units(fetch_units)
, _max_fetch_units(fetch_units.current())
, _max_message_size(std::move(max_message_size))
, _release_units_timer([this] { release_all_units_to_semaphore(); })
, _local_instance_fn(std::move(local_fn)) {
_release_units_timer.arm_periodic(max_release_period);
Expand Down Expand Up @@ -90,18 +92,23 @@ fetch_memory_units fetch_memory_units_manager::allocate_memory_units(
max_bytes = _max_fetch_units;
}

if (max_batch_size > _max_fetch_units) {
const auto& configured_max = _max_message_size();
const size_t max_message_size = configured_max.has_value()
? static_cast<size_t>(*configured_max)
: _max_fetch_units;

if (max_batch_size > max_message_size) {
thread_local static ss::logger::rate_limit rate_limit(rate);
klog.log(
ss::log_level::error,
rate_limit,
"{}: max_batch_size({}) exceeds available fetch memory ({}). "
"{}: max_batch_size({}) exceeds max message size ({}). "
"Consider reducing `message.max.bytes` for the topic. Setting "
"max_batch_size to available fetch memory.",
"max_batch_size to max message size.",
ktp,
max_batch_size,
_max_fetch_units);
max_batch_size = _max_fetch_units;
max_message_size);
max_batch_size = max_message_size;
}

const size_t available_units = std::min(
Expand Down
5 changes: 4 additions & 1 deletion src/v/kafka/server/fetch_memory_units.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "base/seastarx.h"
#include "base/units.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "model/ktp.h"
#include "ssx/semaphore.h"
Expand Down Expand Up @@ -47,7 +48,8 @@ class fetch_memory_units_manager {
fetch_memory_units_manager(
ssx::semaphore& kafka_units,
ssx::semaphore& fetch_units,
local_instance_fn&& local_fn);
local_instance_fn&& local_fn,
config::binding<std::optional<int32_t>>&&);

ss::future<> stop();

Expand Down Expand Up @@ -114,6 +116,7 @@ class fetch_memory_units_manager {
ssx::semaphore& _kafka_units;
ssx::semaphore& _fetch_units;
size_t _max_fetch_units;
config::binding<std::optional<int32_t>> _max_message_size;

ss::timer<> _release_units_timer;
// Collected units are aggregated together by shard in order to ensure there
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ server::server(
memory_fetch_sem(),
[this] -> fetch_memory_units_manager& {
return container().local().fetch_units_manager();
})
},
config::shard_local_cfg().kafka_max_message_size_upper_limit_bytes.bind())
, _probe(std::make_unique<class kafka_probe>())
, _sasl_probe(std::make_unique<class sasl_probe>())
, _read_dist_probe(std::make_unique<read_distribution_probe>())
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ redpanda_cc_gtest(
cpu = 2,
deps = [
"//src/v/base",
"//src/v/config",
"//src/v/kafka/server",
"//src/v/ssx:semaphore",
"//src/v/test_utils:gtest",
Expand Down
33 changes: 32 additions & 1 deletion src/v/kafka/server/tests/fetch_memory_units_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* by the Apache License, Version 2.0
*/

#include "config/configuration.h"
#include "config/mock_property.h"
#include "config/property.h"
#include "kafka/server/fetch_memory_units.h"
#include "ssx/semaphore.h"
#include "test_utils/test.h"
Expand Down Expand Up @@ -41,18 +44,22 @@ class fetch_memory_units_test_fixture : public seastar_test {
ss::future<> SetUpAsync() override {
co_await _kafka_sem.start(100_MiB, ss::sstring("kafka_sem"));
co_await _fetch_sem.start(50_MiB, ss::sstring("fetch_sem"));
co_await _max_message_size.start(std::nullopt);
co_await _manager.start(
ss::sharded_parameter(
[this] { return std::reference_wrapper(_kafka_sem.local()); }),
ss::sharded_parameter(
[this] { return std::reference_wrapper(_fetch_sem.local()); }),
[this] -> kafka::fetch_memory_units_manager& {
return _manager.local();
});
},
ss::sharded_parameter(
[this] { return _max_message_size.local().bind(); }));
}

ss::future<> TearDownAsync() override {
co_await _manager.stop();
co_await _max_message_size.stop();
co_await _kafka_sem.stop();
co_await _fetch_sem.stop();
}
Expand Down Expand Up @@ -83,11 +90,18 @@ class fetch_memory_units_test_fixture : public seastar_test {
[target_units](auto& fs) { set_units(fs, target_units); });
}

ss::future<> set_max_message_size(std::optional<int32_t> s) {
return _max_message_size.invoke_on_all(
[s](auto& ms) { ms.update(std::optional{s}); });
}

private:
ss::sharded<ssx::semaphore> _kafka_sem;
ss::sharded<ssx::semaphore> _fetch_sem;
ss::sharded<kafka::fetch_memory_units_manager> _manager;
ss::sharded<ssx::semaphore> _sem;
ss::sharded<config::mock_property<std::optional<int32_t>>>
_max_message_size;
};

TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) {
Expand Down Expand Up @@ -149,6 +163,23 @@ TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) {
EXPECT_EQ(co_await other_kafka_sem_avail(), max_release_size);
}

TEST_F_CORO(fetch_memory_units_test_fixture, test_max_units) {
kafka::fetch_memory_units_manager& mgr = local_manager();

co_await set_kafka_units(1000);
co_await set_fetch_units(1000);
co_await set_max_message_size(10);

auto units = mgr.allocate_memory_units(model::ktp{}, 1, 100, 1, false);
EXPECT_EQ(units.num_units(), 10);
units = mgr.allocate_memory_units(model::ktp{}, 1, 100, 1, true);
EXPECT_EQ(units.num_units(), 10);

// `max_bytes` should still be reserved if there are enough units.
units = mgr.allocate_memory_units(model::ktp{}, 100, 1, 1, false);
EXPECT_EQ(units.num_units(), 100);
}

TEST_F_CORO(fetch_memory_units_test_fixture, test_adjust_units) {
kafka::fetch_memory_units_manager& mgr = local_manager();

Expand Down
Loading