diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index f2098733683..34d3236fa57 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2776,24 +2776,15 @@ fun(Conf) -> end end}. +%% See github.com/rabbitmq/osiris#192 and https://github.com/rabbitmq/rabbitmq-server/commit/9f162dfd01c1516d168e7d1fad39d33a929756e5 +%% Enables read-head {mapping, "stream.read_ahead", "rabbit.stream_read_ahead", - [{datatype, {enum, [true, false]}}]}. - -{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [ - {datatype, [integer, string]}, - {validators, ["is_supported_information_unit"]} -]}. + [{datatype, [{enum, [true, false]}, integer, string]}]}. {mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [ {datatype, [binary]} ]}. - -%% See github.com/rabbitmq/osiris#192 and https://github.com/rabbitmq/rabbitmq-server/commit/9f162dfd01c1516d168e7d1fad39d33a929756e5 -%% Enables read-head -{mapping, "stream.read_ahead", "rabbit.stream_read_ahead", - [{datatype, {enum, [true, false]}}]}. - {translation, "rabbit.cluster_tags", fun(Conf) -> case cuttlefish:conf_get("cluster_tags", Conf, undefined) of diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 7258bd0236c..d66b059b6fc 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -50,7 +50,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). -export([readers/1]). --export([read_ahead_on/0, read_ahead_limit/0]). +-export([read_ahead/0]). -export([parse_offset_arg/1, filter_spec/1]). @@ -468,8 +468,7 @@ begin_stream(#stream_client{name = QName, Tag, Offset, Mode, AckRequired, Filter, Options0) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, - Options1 = Options0#{read_ahead => read_ahead_on(), - read_ahead_limit => read_ahead_limit()}, + Options1 = Options0#{read_ahead => read_ahead()}, {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1), NextOffset = osiris_log:next_offset(Seg0) - 1, osiris:register_offset_listener(LocalPid, NextOffset), @@ -1531,16 +1530,13 @@ queue_vm_ets() -> shrink_all(_Node) -> {error, not_quorum_queue}. -read_ahead_on() -> - application:get_env(rabbit, stream_read_ahead, true). - --spec read_ahead_limit() -> integer() | undefined. -read_ahead_limit() -> - case application:get_env(rabbit, stream_read_ahead_limit, undefined) of - undefined -> - undefined; - Bytes when is_integer(Bytes) -> - Bytes; +-spec read_ahead() -> boolean() | non_neg_integer(). +read_ahead() -> + case application:get_env(rabbit, stream_read_ahead, true) of + Toggle when is_boolean(Toggle) -> + Toggle; + LimitBytes when is_integer(LimitBytes) -> + LimitBytes; Limit when is_list(Limit) -> case rabbit_resource_monitor_misc:parse_information_unit(Limit) of {ok, ParsedLimit} -> @@ -1548,6 +1544,6 @@ read_ahead_limit() -> {error, parse_error} -> ?LOG_ERROR("Unable to parse stream read ahead limit value " "~tp", [Limit]), - undefined + true end end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 3bd611c4ea8..bdf5f9120e9 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1224,19 +1224,6 @@ credential_validator.regexp = ^abc\\d+", ]}], []}, - %% - %% Stream read ahead on/off - %% - - {stream_read_ahead, - " - stream.read_ahead = false - ", - [{rabbit, [ - {stream_read_ahead, false} - ]}], - []}, - {stream_replication_port_range, " stream.replication.port_range.max = 4600 @@ -1247,7 +1234,7 @@ credential_validator.regexp = ^abc\\d+", []}, %% - %% Stream read ahead on/off + %% Stream read ahead on/off/limit %% {stream_read_ahead, @@ -1267,25 +1254,20 @@ credential_validator.regexp = ^abc\\d+", {stream_read_ahead, false} ]}], []}, - - %% - %% Stream read limit - %% {stream_read_ahead_limit_bytes, " - stream.read_ahead_limit = 8192 + stream.read_ahead = 8192 ", [{rabbit, [ - {stream_read_ahead_limit, 8192} + {stream_read_ahead, 8192} ]}], []}, {stream_read_ahead_limit_information_unit, " - stream.read_ahead_limit = 8KiB + stream.read_ahead = 8KiB ", [{rabbit, [ - {stream_read_ahead_limit, "8KiB"} + {stream_read_ahead, "8KiB"} ]}], []} - ]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 575d587a6d3..1f9627a556c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2814,8 +2814,7 @@ init_reader(ConnectionTransport, CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, Options0 = #{transport => ConnectionTransport, chunk_selector => get_chunk_selector(Properties), - read_ahead => rabbit_stream_queue:read_ahead_on(), - read_ahead_limit => rabbit_stream_queue:read_ahead_limit()}, + read_ahead => rabbit_stream_queue:read_ahead()}, Options1 = maps:merge(Options0, rabbit_stream_utils:filter_spec(Properties)), diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 61891fc7b63..7fc8aae4fbd 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1 +dep_osiris = git https://github.com/rabbitmq/osiris v1.10.2 dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0