From c75ef6d7b2bc817bc91efade7d2e25bb06e575e9 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 3 Nov 2025 14:56:26 -0500 Subject: [PATCH 1/3] Update osiris to v1.10.1 (cherry picked from commit f4b99042819a2b784ae557621447de96937d7d32) --- rabbitmq-components.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 35ce5210dc2..61891fc7b63 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.10.0 +dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1 dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0 From 53db29b1f3dfdbec37c247fcae28c569056960d3 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 3 Nov 2025 14:56:20 -0500 Subject: [PATCH 2/3] Add `stream.read_ahead_limit` config option (cherry picked from commit f40cecd0a1962b5f0f7716fc4720e35671d45593) # Conflicts: # deps/rabbit/src/rabbit_stream_queue.erl --- deps/rabbit/priv/schema/rabbit.schema | 5 ++++ deps/rabbit/src/rabbit_stream_queue.erl | 29 +++++++++++++++++-- .../config_schema_SUITE_data/rabbit.snippets | 20 +++++++++++++ .../src/rabbit_stream_reader.erl | 3 +- 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 682cfd8a27f..f2098733683 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2779,6 +2779,11 @@ end}. {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", [{datatype, {enum, [true, false]}}]}. +{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + {mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [ {datatype, [binary]} ]}. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index eec285a8095..9a428856009 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -50,7 +50,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). -export([readers/1]). --export([read_ahead_on/0]). +-export([read_ahead_on/0, read_ahead_limit/0]). -export([parse_offset_arg/1, filter_spec/1]). @@ -468,7 +468,8 @@ begin_stream(#stream_client{name = QName, Tag, Offset, Mode, AckRequired, Filter, Options0) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, - Options1 = Options0#{read_ahead => read_ahead_on()}, + Options1 = Options0#{read_ahead => read_ahead_on(), + read_ahead_limit => read_ahead_limit()}, {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1), NextOffset = osiris_log:next_offset(Seg0) - 1, osiris:register_offset_listener(LocalPid, NextOffset), @@ -1531,4 +1532,26 @@ shrink_all(_Node) -> {error, not_quorum_queue}. read_ahead_on() -> - application:get_env(rabbit, stream_read_ahead, true). \ No newline at end of file +<<<<<<< HEAD + application:get_env(rabbit, stream_read_ahead, true). +======= + application:get_env(rabbit, stream_read_ahead, true). + +-spec read_ahead_limit() -> integer() | undefined. +read_ahead_limit() -> + case application:get_env(rabbit, stream_read_ahead_limit, undefined) of + undefined -> + undefined; + Bytes when is_integer(Bytes) -> + Bytes; + Limit when is_list(Limit) -> + case rabbit_resource_monitor_misc:parse_information_unit(Limit) of + {ok, ParsedLimit} -> + ParsedLimit; + {error, parse_error} -> + ?LOG_ERROR("Unable to parse stream read ahead limit value " + "~tp", [Limit]), + undefined + end + end. +>>>>>>> f40cecd0a (Add `stream.read_ahead_limit` config option) diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index a61e9917515..3bd611c4ea8 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1266,6 +1266,26 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {stream_read_ahead, false} ]}], + []}, + + %% + %% Stream read limit + %% + {stream_read_ahead_limit_bytes, + " + stream.read_ahead_limit = 8192 + ", + [{rabbit, [ + {stream_read_ahead_limit, 8192} + ]}], + []}, + {stream_read_ahead_limit_information_unit, + " + stream.read_ahead_limit = 8KiB + ", + [{rabbit, [ + {stream_read_ahead_limit, "8KiB"} + ]}], []} ]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index aac8bd4abd8..575d587a6d3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2814,7 +2814,8 @@ init_reader(ConnectionTransport, CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, Options0 = #{transport => ConnectionTransport, chunk_selector => get_chunk_selector(Properties), - read_ahead => rabbit_stream_queue:read_ahead_on()}, + read_ahead => rabbit_stream_queue:read_ahead_on(), + read_ahead_limit => rabbit_stream_queue:read_ahead_limit()}, Options1 = maps:merge(Options0, rabbit_stream_utils:filter_spec(Properties)), From 7242f280beb9b7aab9596d2883275709cd78dc65 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 3 Nov 2025 18:52:49 -0500 Subject: [PATCH 3/3] Resolve merge conflicts Looks like the trailing line ending mixed up the automatic merge. --- deps/rabbit/src/rabbit_stream_queue.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 9a428856009..7258bd0236c 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1532,9 +1532,6 @@ shrink_all(_Node) -> {error, not_quorum_queue}. read_ahead_on() -> -<<<<<<< HEAD - application:get_env(rabbit, stream_read_ahead, true). -======= application:get_env(rabbit, stream_read_ahead, true). -spec read_ahead_limit() -> integer() | undefined. @@ -1554,4 +1551,3 @@ read_ahead_limit() -> undefined end end. ->>>>>>> f40cecd0a (Add `stream.read_ahead_limit` config option)