Skip to content

Commit 89707d7

Browse files
committed
CQ: Remove set_queue_mode/set_queue_version
1 parent 9e8ff48 commit 89707d7

File tree

5 files changed

+21
-133
lines changed

5 files changed

+21
-133
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -490,9 +490,7 @@ process_args_policy(State = #q{q = Q,
490490
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
491491
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
492492
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
493-
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
494-
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2},
495-
{<<"queue-version">>, fun res_arg/2, fun init_queue_version/2}],
493+
{<<"overflow">>, fun res_arg/2, fun init_overflow/2}],
496494
drop_expired_msgs(
497495
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
498496
Fun(rabbit_queue_type_util:args_policy_lookup(Name, Resolve, Q), StateN)
@@ -543,22 +541,6 @@ init_overflow(Overflow, State) ->
543541
State#q{overflow = OverflowVal}
544542
end.
545543

546-
init_queue_mode(undefined, State) ->
547-
State;
548-
init_queue_mode(Mode, State = #q {backing_queue = BQ,
549-
backing_queue_state = BQS}) ->
550-
BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS),
551-
State#q{backing_queue_state = BQS1}.
552-
553-
init_queue_version(Version0, State = #q {backing_queue = BQ,
554-
backing_queue_state = BQS}) ->
555-
Version = case Version0 of
556-
undefined -> 2;
557-
_ -> Version0
558-
end,
559-
BQS1 = BQ:set_queue_version(Version, BQS),
560-
State#q{backing_queue_state = BQS1}.
561-
562544
reply(Reply, NewState) ->
563545
{NewState1, Timeout} = next_state(NewState),
564546
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,6 @@
215215
%% or discarded previously).
216216
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.
217217

218-
-callback set_queue_mode(queue_mode(), state()) -> state().
219-
220-
-callback set_queue_version(queue_version(), state()) -> state().
221-
222218
-callback zip_msgs_and_acks([delivered_publish()],
223219
[ack()], Acc, state())
224220
-> Acc.

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232
ackfold/4, len/1, is_empty/1, depth/1,
3333
update_rates/1, needs_timeout/1, timeout/1,
3434
handle_pre_hibernate/1, resume/1, msg_rates/1,
35-
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
36-
set_queue_version/2,
35+
info/2, invoke/3, is_duplicate/2,
3736
zip_msgs_and_acks/4,
3837
format_state/1]).
3938

@@ -390,16 +389,6 @@ is_duplicate(Msg, State = #state{bq = BQ}) ->
390389
is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
391390
?passthrough2(is_duplicate(Msg, BQS)).
392391

393-
set_queue_mode(Mode, State = #state{bq = BQ}) ->
394-
foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State);
395-
set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
396-
?passthrough1(set_queue_mode(Mode, BQS)).
397-
398-
set_queue_version(Version, State = #state{bq = BQ}) ->
399-
foreach1(fun (_P, BQSN) -> BQ:set_queue_version(Version, BQSN) end, State);
400-
set_queue_version(Version, State = #passthrough{bq = BQ, bqs = BQS}) ->
401-
?passthrough1(set_queue_version(Version, BQS)).
402-
403392
zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
404393
MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
405394
lists:foldl(fun (Acks, MAs) ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
ackfold/4, len/1, is_empty/1, depth/1,
1616
update_rates/1, needs_timeout/1, timeout/1,
1717
handle_pre_hibernate/1, resume/1, msg_rates/1,
18-
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
19-
set_queue_version/2, zip_msgs_and_acks/4,
18+
info/2, invoke/3, is_duplicate/2,
19+
zip_msgs_and_acks/4,
2020
format_state/1]).
2121

2222
-export([start/2, stop/1]).
@@ -746,20 +746,12 @@ invoke( _, _, State) -> State.
746746

747747
is_duplicate(_Msg, State) -> {false, State}.
748748

749-
%% Queue mode has been unified.
750-
set_queue_mode(_, State) ->
751-
State.
752-
753749
zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
754750
lists:foldl(fun ({{Msg, _Props}, AckTag}, Acc) ->
755751
Id = mc:get_annotation(id, Msg),
756752
[{Id, AckTag} | Acc]
757753
end, Accumulator, lists:zip(Msgs, AckTags)).
758754

759-
%% Queue version now ignored; only v2 is available.
760-
set_queue_version(_, State) ->
761-
State.
762-
763755
%% Get the Timestamp property of the first msg, if present. This is
764756
%% the one with the oldest timestamp among the heads of the pending
765757
%% acks and unread queues. We can't check disk_pending_acks as these
@@ -1872,6 +1864,7 @@ read_from_q_tail(DelsAndAcksFun,
18721864
%% For v2 we want to limit the number of messages read at once to lower
18731865
%% the memory footprint. We use the consume rate to determine how many
18741866
%% messages we read.
1867+
%% @todo Simply ask for N messages instead of low/high bounds.
18751868
QTailSeqLimit = QTailSeqId + MemoryLimit,
18761869
QTailSeqId1 =
18771870
lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(QTailSeqId),

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 16 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
-define(BACKING_QUEUE_TESTCASES, [
4040
bq_queue_index,
4141
bq_queue_index_props,
42-
{variable_queue_default, [parallel], ?VARIABLE_QUEUE_TESTCASES},
42+
{variable_queue, [parallel], ?VARIABLE_QUEUE_TESTCASES},
4343
bq_variable_queue_delete_msg_store_files_callback,
4444
bq_queue_recover
4545
]).
@@ -127,8 +127,6 @@ init_per_group1(backing_queue_embed_limit_1024, Config) ->
127127
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
128128
application, set_env, [rabbit, queue_index_embed_msgs_below, 1024]),
129129
Config;
130-
init_per_group1(variable_queue_default, Config) ->
131-
rabbit_ct_helpers:set_config(Config, {variable_queue_type, default});
132130
%% @todo These groups are no longer used?
133131
init_per_group1(from_cluster_node1, Config) ->
134132
rabbit_ct_helpers:set_config(Config, {test_direction, {0, 1}});
@@ -1169,9 +1167,7 @@ variable_queue_partial_segments_q_tail_thing(Config) ->
11691167
?MODULE, variable_queue_partial_segments_q_tail_thing1, [Config]).
11701168

11711169
variable_queue_partial_segments_q_tail_thing1(Config) ->
1172-
with_fresh_variable_queue(
1173-
fun variable_queue_partial_segments_q_tail_thing2/2,
1174-
?config(variable_queue_type, Config)).
1170+
with_fresh_variable_queue(fun variable_queue_partial_segments_q_tail_thing2/2).
11751171

11761172
variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
11771173
IndexMod = index_mod(),
@@ -1216,9 +1212,7 @@ variable_queue_all_the_bits_not_covered_elsewhere_A(Config) ->
12161212
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]).
12171213

12181214
variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) ->
1219-
with_fresh_variable_queue(
1220-
fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2,
1221-
?config(variable_queue_type, Config)).
1215+
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2).
12221216

12231217
variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
12241218
IndexMod = index_mod(),
@@ -1243,9 +1237,7 @@ variable_queue_all_the_bits_not_covered_elsewhere_B(Config) ->
12431237
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]).
12441238

12451239
variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) ->
1246-
with_fresh_variable_queue(
1247-
fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2,
1248-
?config(variable_queue_type, Config)).
1240+
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2).
12491241

12501242
variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
12511243
VQ2 = variable_queue_publish(false, 4, VQ1),
@@ -1263,9 +1255,7 @@ variable_queue_drop(Config) ->
12631255
?MODULE, variable_queue_drop1, [Config]).
12641256

12651257
variable_queue_drop1(Config) ->
1266-
with_fresh_variable_queue(
1267-
fun variable_queue_drop2/2,
1268-
?config(variable_queue_type, Config)).
1258+
with_fresh_variable_queue(fun variable_queue_drop2/2).
12691259

12701260
variable_queue_drop2(VQ0, _QName) ->
12711261
%% start by sending a messages
@@ -1288,9 +1278,7 @@ variable_queue_fold_msg_on_disk(Config) ->
12881278
?MODULE, variable_queue_fold_msg_on_disk1, [Config]).
12891279

12901280
variable_queue_fold_msg_on_disk1(Config) ->
1291-
with_fresh_variable_queue(
1292-
fun variable_queue_fold_msg_on_disk2/2,
1293-
?config(variable_queue_type, Config)).
1281+
with_fresh_variable_queue(fun variable_queue_fold_msg_on_disk2/2).
12941282

12951283
variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
12961284
VQ1 = variable_queue_publish(true, 1, VQ0),
@@ -1304,9 +1292,7 @@ variable_queue_dropfetchwhile(Config) ->
13041292
?MODULE, variable_queue_dropfetchwhile1, [Config]).
13051293

13061294
variable_queue_dropfetchwhile1(Config) ->
1307-
with_fresh_variable_queue(
1308-
fun variable_queue_dropfetchwhile2/2,
1309-
?config(variable_queue_type, Config)).
1295+
with_fresh_variable_queue(fun variable_queue_dropfetchwhile2/2).
13101296

13111297
variable_queue_dropfetchwhile2(VQ0, _QName) ->
13121298
Count = 10,
@@ -1352,9 +1338,7 @@ variable_queue_dropwhile_restart(Config) ->
13521338
?MODULE, variable_queue_dropwhile_restart1, [Config]).
13531339

13541340
variable_queue_dropwhile_restart1(Config) ->
1355-
with_fresh_variable_queue(
1356-
fun variable_queue_dropwhile_restart2/2,
1357-
?config(variable_queue_type, Config)).
1341+
with_fresh_variable_queue(fun variable_queue_dropwhile_restart2/2).
13581342

13591343
variable_queue_dropwhile_restart2(VQ0, QName) ->
13601344
Count = 10000,
@@ -1391,9 +1375,7 @@ variable_queue_dropwhile_sync_restart(Config) ->
13911375
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
13921376

13931377
variable_queue_dropwhile_sync_restart1(Config) ->
1394-
with_fresh_variable_queue(
1395-
fun variable_queue_dropwhile_sync_restart2/2,
1396-
?config(variable_queue_type, Config)).
1378+
with_fresh_variable_queue(fun variable_queue_dropwhile_sync_restart2/2).
13971379

13981380
variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
13991381
Count = 10000,
@@ -1433,9 +1415,7 @@ variable_queue_restart_large_seq_id(Config) ->
14331415
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
14341416

14351417
variable_queue_restart_large_seq_id1(Config) ->
1436-
with_fresh_variable_queue(
1437-
fun variable_queue_restart_large_seq_id2/2,
1438-
?config(variable_queue_type, Config)).
1418+
with_fresh_variable_queue(fun variable_queue_restart_large_seq_id2/2).
14391419

14401420
variable_queue_restart_large_seq_id2(VQ0, QName) ->
14411421
Count = 1,
@@ -1472,9 +1452,7 @@ variable_queue_ack_limiting(Config) ->
14721452
?MODULE, variable_queue_ack_limiting1, [Config]).
14731453

14741454
variable_queue_ack_limiting1(Config) ->
1475-
with_fresh_variable_queue(
1476-
fun variable_queue_ack_limiting2/2,
1477-
?config(variable_queue_type, Config)).
1455+
with_fresh_variable_queue(fun variable_queue_ack_limiting2/2).
14781456

14791457
variable_queue_ack_limiting2(VQ0, _Config) ->
14801458
%% start by sending in a bunch of messages
@@ -1502,9 +1480,7 @@ variable_queue_purge(Config) ->
15021480
?MODULE, variable_queue_purge1, [Config]).
15031481

15041482
variable_queue_purge1(Config) ->
1505-
with_fresh_variable_queue(
1506-
fun variable_queue_purge2/2,
1507-
?config(variable_queue_type, Config)).
1483+
with_fresh_variable_queue(fun variable_queue_purge2/2).
15081484

15091485
variable_queue_purge2(VQ0, _Config) ->
15101486
LenDepth = fun (VQ) ->
@@ -1526,9 +1502,7 @@ variable_queue_requeue(Config) ->
15261502
?MODULE, variable_queue_requeue1, [Config]).
15271503

15281504
variable_queue_requeue1(Config) ->
1529-
with_fresh_variable_queue(
1530-
fun variable_queue_requeue2/2,
1531-
?config(variable_queue_type, Config)).
1505+
with_fresh_variable_queue(fun variable_queue_requeue2/2).
15321506

15331507
variable_queue_requeue2(VQ0, _Config) ->
15341508
{_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
@@ -1554,9 +1528,7 @@ variable_queue_requeue_ram_beta(Config) ->
15541528
?MODULE, variable_queue_requeue_ram_beta1, [Config]).
15551529

15561530
variable_queue_requeue_ram_beta1(Config) ->
1557-
with_fresh_variable_queue(
1558-
fun variable_queue_requeue_ram_beta2/2,
1559-
?config(variable_queue_type, Config)).
1531+
with_fresh_variable_queue(fun variable_queue_requeue_ram_beta2/2).
15601532

15611533
variable_queue_requeue_ram_beta2(VQ0, _Config) ->
15621534
IndexMod = index_mod(),
@@ -1571,46 +1543,6 @@ variable_queue_requeue_ram_beta2(VQ0, _Config) ->
15711543
{_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
15721544
VQ8.
15731545

1574-
%% same as test_variable_queue_requeue_ram_beta but randomly changing
1575-
%% the queue mode after every step.
1576-
variable_queue_mode_change(Config) ->
1577-
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1578-
?MODULE, variable_queue_mode_change1, [Config]).
1579-
1580-
variable_queue_mode_change1(Config) ->
1581-
with_fresh_variable_queue(
1582-
fun variable_queue_mode_change2/2,
1583-
?config(variable_queue_type, Config)).
1584-
1585-
variable_queue_mode_change2(VQ0, _Config) ->
1586-
IndexMod = index_mod(),
1587-
Count = IndexMod:next_segment_boundary(0)*2 + 2,
1588-
VQ1 = variable_queue_publish(false, Count, VQ0),
1589-
VQ2 = maybe_switch_queue_mode(VQ1),
1590-
{VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
1591-
VQ4 = maybe_switch_queue_mode(VQ3),
1592-
{Back, Front} = lists:split(Count div 2, AcksR),
1593-
{_, VQ5} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ4),
1594-
VQ6 = maybe_switch_queue_mode(VQ5),
1595-
VQ8 = maybe_switch_queue_mode(VQ6),
1596-
{_, VQ9} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ8),
1597-
VQ10 = maybe_switch_queue_mode(VQ9),
1598-
VQ11 = requeue_one_by_one(Front, VQ10),
1599-
VQ12 = maybe_switch_queue_mode(VQ11),
1600-
{VQ13, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ12),
1601-
VQ14 = maybe_switch_queue_mode(VQ13),
1602-
{_, VQ15} = rabbit_variable_queue:ack(AcksAll, VQ14),
1603-
VQ16 = maybe_switch_queue_mode(VQ15),
1604-
VQ16.
1605-
1606-
maybe_switch_queue_mode(VQ) ->
1607-
Mode = random_queue_mode(),
1608-
set_queue_mode(Mode, VQ).
1609-
1610-
random_queue_mode() ->
1611-
Modes = [lazy, default],
1612-
lists:nth(rand:uniform(length(Modes)), Modes).
1613-
16141546
pub_res({_, VQS}) ->
16151547
VQS;
16161548
pub_res(VQS) ->
@@ -1758,7 +1690,7 @@ wait_for_confirms(Unconfirmed) ->
17581690
end
17591691
end.
17601692

1761-
with_fresh_variable_queue(Fun, Mode) ->
1693+
with_fresh_variable_queue(Fun) ->
17621694
Ref = make_ref(),
17631695
Me = self(),
17641696
%% Run in a separate process since rabbit_msg_store will send
@@ -1771,10 +1703,9 @@ with_fresh_variable_queue(Fun, Mode) ->
17711703
assert_props(S0, [{q_head, 0},
17721704
{q_tail, {q_tail, undefined, 0, undefined}},
17731705
{len, 0}]),
1774-
VQ1 = set_queue_mode(Mode, VQ),
17751706
try
17761707
_ = rabbit_variable_queue:delete_and_terminate(
1777-
shutdown, Fun(VQ1, QName)),
1708+
shutdown, Fun(VQ, QName)),
17781709
Me ! Ref
17791710
catch
17801711
Type:Error:Stacktrace ->
@@ -1787,9 +1718,6 @@ with_fresh_variable_queue(Fun, Mode) ->
17871718
end,
17881719
passed.
17891720

1790-
set_queue_mode(Mode, VQ) ->
1791-
rabbit_variable_queue:set_queue_mode(Mode, VQ).
1792-
17931721
variable_queue_publish(IsPersistent, Count, VQ) ->
17941722
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
17951723

0 commit comments

Comments
 (0)