Skip to content

Commit dec4e24

Browse files
Merge pull request #14948 from rabbitmq/mergify/bp/v4.2.x/pr-14894
Merge `stream.read_ahead` and `stream.read_ahead_limit` options (backport #14894)
2 parents 7e43377 + 7b93c92 commit dec4e24

File tree

5 files changed

+20
-52
lines changed

5 files changed

+20
-52
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2776,24 +2776,15 @@ fun(Conf) ->
27762776
end
27772777
end}.
27782778

2779+
%% See github.com/rabbitmq/osiris#192 and https://github.com/rabbitmq/rabbitmq-server/commit/9f162dfd01c1516d168e7d1fad39d33a929756e5
2780+
%% Enables read-head
27792781
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2780-
[{datatype, {enum, [true, false]}}]}.
2781-
2782-
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2783-
{datatype, [integer, string]},
2784-
{validators, ["is_supported_information_unit"]}
2785-
]}.
2782+
[{datatype, [{enum, [true, false]}, integer, string]}]}.
27862783

27872784
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27882785
{datatype, [binary]}
27892786
]}.
27902787

2791-
2792-
%% See github.com/rabbitmq/osiris#192 and https://github.com/rabbitmq/rabbitmq-server/commit/9f162dfd01c1516d168e7d1fad39d33a929756e5
2793-
%% Enables read-head
2794-
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2795-
[{datatype, {enum, [true, false]}}]}.
2796-
27972788
{translation, "rabbit.cluster_tags",
27982789
fun(Conf) ->
27992790
case cuttlefish:conf_get("cluster_tags", Conf, undefined) of

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
-export([format_osiris_event/2]).
5151
-export([update_stream_conf/2]).
5252
-export([readers/1]).
53-
-export([read_ahead_on/0, read_ahead_limit/0]).
53+
-export([read_ahead/0]).
5454

5555
-export([parse_offset_arg/1,
5656
filter_spec/1]).
@@ -468,8 +468,7 @@ begin_stream(#stream_client{name = QName,
468468
Tag, Offset, Mode, AckRequired, Filter, Options0)
469469
when is_pid(LocalPid) ->
470470
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
471-
Options1 = Options0#{read_ahead => read_ahead_on(),
472-
read_ahead_limit => read_ahead_limit()},
471+
Options1 = Options0#{read_ahead => read_ahead()},
473472
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
474473
NextOffset = osiris_log:next_offset(Seg0) - 1,
475474
osiris:register_offset_listener(LocalPid, NextOffset),
@@ -1531,23 +1530,20 @@ queue_vm_ets() ->
15311530
shrink_all(_Node) ->
15321531
{error, not_quorum_queue}.
15331532

1534-
read_ahead_on() ->
1535-
application:get_env(rabbit, stream_read_ahead, true).
1536-
1537-
-spec read_ahead_limit() -> integer() | undefined.
1538-
read_ahead_limit() ->
1539-
case application:get_env(rabbit, stream_read_ahead_limit, undefined) of
1540-
undefined ->
1541-
undefined;
1542-
Bytes when is_integer(Bytes) ->
1543-
Bytes;
1533+
-spec read_ahead() -> boolean() | non_neg_integer().
1534+
read_ahead() ->
1535+
case application:get_env(rabbit, stream_read_ahead, true) of
1536+
Toggle when is_boolean(Toggle) ->
1537+
Toggle;
1538+
LimitBytes when is_integer(LimitBytes) ->
1539+
LimitBytes;
15441540
Limit when is_list(Limit) ->
15451541
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
15461542
{ok, ParsedLimit} ->
15471543
ParsedLimit;
15481544
{error, parse_error} ->
15491545
?LOG_ERROR("Unable to parse stream read ahead limit value "
15501546
"~tp", [Limit]),
1551-
undefined
1547+
true
15521548
end
15531549
end.

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,19 +1224,6 @@ credential_validator.regexp = ^abc\\d+",
12241224
]}],
12251225
[]},
12261226

1227-
%%
1228-
%% Stream read ahead on/off
1229-
%%
1230-
1231-
{stream_read_ahead,
1232-
"
1233-
stream.read_ahead = false
1234-
",
1235-
[{rabbit, [
1236-
{stream_read_ahead, false}
1237-
]}],
1238-
[]},
1239-
12401227
{stream_replication_port_range,
12411228
"
12421229
stream.replication.port_range.max = 4600
@@ -1247,7 +1234,7 @@ credential_validator.regexp = ^abc\\d+",
12471234
[]},
12481235

12491236
%%
1250-
%% Stream read ahead on/off
1237+
%% Stream read ahead on/off/limit
12511238
%%
12521239

12531240
{stream_read_ahead,
@@ -1267,25 +1254,20 @@ credential_validator.regexp = ^abc\\d+",
12671254
{stream_read_ahead, false}
12681255
]}],
12691256
[]},
1270-
1271-
%%
1272-
%% Stream read limit
1273-
%%
12741257
{stream_read_ahead_limit_bytes,
12751258
"
1276-
stream.read_ahead_limit = 8192
1259+
stream.read_ahead = 8192
12771260
",
12781261
[{rabbit, [
1279-
{stream_read_ahead_limit, 8192}
1262+
{stream_read_ahead, 8192}
12801263
]}],
12811264
[]},
12821265
{stream_read_ahead_limit_information_unit,
12831266
"
1284-
stream.read_ahead_limit = 8KiB
1267+
stream.read_ahead = 8KiB
12851268
",
12861269
[{rabbit, [
1287-
{stream_read_ahead_limit, "8KiB"}
1270+
{stream_read_ahead, "8KiB"}
12881271
]}],
12891272
[]}
1290-
12911273
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2814,8 +2814,7 @@ init_reader(ConnectionTransport,
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
28152815
Options0 = #{transport => ConnectionTransport,
28162816
chunk_selector => get_chunk_selector(Properties),
2817-
read_ahead => rabbit_stream_queue:read_ahead_on(),
2818-
read_ahead_limit => rabbit_stream_queue:read_ahead_limit()},
2817+
read_ahead => rabbit_stream_queue:read_ahead()},
28192818

28202819
Options1 = maps:merge(Options0,
28212820
rabbit_stream_utils:filter_spec(Properties)),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1
52+
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.2
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)