Skip to content

Commit 01979d8

Browse files
the-mikedavismergify[bot]
authored andcommitted
Merge stream.read_ahead and stream.read_ahead_limit options
Read-ahead can be controlled by a single option, with zero acting the same as `false`. (cherry picked from commit 76ce4ee) # Conflicts: # deps/rabbit/src/rabbit_stream_queue.erl
1 parent 7e43377 commit 01979d8

File tree

5 files changed

+21
-22
lines changed

5 files changed

+21
-22
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2777,12 +2777,7 @@ fun(Conf) ->
27772777
end}.
27782778

27792779
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2780-
[{datatype, {enum, [true, false]}}]}.
2781-
2782-
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2783-
{datatype, [integer, string]},
2784-
{validators, ["is_supported_information_unit"]}
2785-
]}.
2780+
[{datatype, [{enum, [true, false]}, integer, string]}]}.
27862781

27872782
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27882783
{datatype, [binary]}

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
-export([format_osiris_event/2]).
5151
-export([update_stream_conf/2]).
5252
-export([readers/1]).
53-
-export([read_ahead_on/0, read_ahead_limit/0]).
53+
-export([read_ahead/0]).
5454

5555
-export([parse_offset_arg/1,
5656
filter_spec/1]).
@@ -468,8 +468,7 @@ begin_stream(#stream_client{name = QName,
468468
Tag, Offset, Mode, AckRequired, Filter, Options0)
469469
when is_pid(LocalPid) ->
470470
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
471-
Options1 = Options0#{read_ahead => read_ahead_on(),
472-
read_ahead_limit => read_ahead_limit()},
471+
Options1 = Options0#{read_ahead => read_ahead()},
473472
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
474473
NextOffset = osiris_log:next_offset(Seg0) - 1,
475474
osiris:register_offset_listener(LocalPid, NextOffset),
@@ -1528,6 +1527,7 @@ queue_vm_ets() ->
15281527
{[],
15291528
[]}.
15301529

1530+
<<<<<<< HEAD
15311531
shrink_all(_Node) ->
15321532
{error, not_quorum_queue}.
15331533

@@ -1541,13 +1541,22 @@ read_ahead_limit() ->
15411541
undefined;
15421542
Bytes when is_integer(Bytes) ->
15431543
Bytes;
1544+
=======
1545+
-spec read_ahead() -> boolean() | non_neg_integer().
1546+
read_ahead() ->
1547+
case application:get_env(rabbit, stream_read_ahead, true) of
1548+
Toggle when is_boolean(Toggle) ->
1549+
Toggle;
1550+
LimitBytes when is_integer(LimitBytes) ->
1551+
LimitBytes;
1552+
>>>>>>> 76ce4ee42 (Merge `stream.read_ahead` and `stream.read_ahead_limit` options)
15441553
Limit when is_list(Limit) ->
15451554
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
15461555
{ok, ParsedLimit} ->
15471556
ParsedLimit;
15481557
{error, parse_error} ->
15491558
?LOG_ERROR("Unable to parse stream read ahead limit value "
15501559
"~tp", [Limit]),
1551-
undefined
1560+
true
15521561
end
15531562
end.

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,7 +1247,7 @@ credential_validator.regexp = ^abc\\d+",
12471247
[]},
12481248

12491249
%%
1250-
%% Stream read ahead on/off
1250+
%% Stream read ahead on/off/limit
12511251
%%
12521252

12531253
{stream_read_ahead,
@@ -1267,24 +1267,20 @@ credential_validator.regexp = ^abc\\d+",
12671267
{stream_read_ahead, false}
12681268
]}],
12691269
[]},
1270-
1271-
%%
1272-
%% Stream read limit
1273-
%%
12741270
{stream_read_ahead_limit_bytes,
12751271
"
1276-
stream.read_ahead_limit = 8192
1272+
stream.read_ahead = 8192
12771273
",
12781274
[{rabbit, [
1279-
{stream_read_ahead_limit, 8192}
1275+
{stream_read_ahead, 8192}
12801276
]}],
12811277
[]},
12821278
{stream_read_ahead_limit_information_unit,
12831279
"
1284-
stream.read_ahead_limit = 8KiB
1280+
stream.read_ahead = 8KiB
12851281
",
12861282
[{rabbit, [
1287-
{stream_read_ahead_limit, "8KiB"}
1283+
{stream_read_ahead, "8KiB"}
12881284
]}],
12891285
[]}
12901286

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2814,8 +2814,7 @@ init_reader(ConnectionTransport,
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
28152815
Options0 = #{transport => ConnectionTransport,
28162816
chunk_selector => get_chunk_selector(Properties),
2817-
read_ahead => rabbit_stream_queue:read_ahead_on(),
2818-
read_ahead_limit => rabbit_stream_queue:read_ahead_limit()},
2817+
read_ahead => rabbit_stream_queue:read_ahead()},
28192818

28202819
Options1 = maps:merge(Options0,
28212820
rabbit_stream_utils:filter_spec(Properties)),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1
52+
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.2
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)