1616 ackfold /4 , len /1 , is_empty /1 , depth /1 ,
1717 update_rates /1 , needs_timeout /1 , timeout /1 ,
1818 handle_pre_hibernate /1 , resume /1 , msg_rates /1 ,
19- info /2 , invoke /3 , is_duplicate /2 , set_queue_mode /2 ,
20- set_queue_version /2 ,
19+ info /2 , invoke /3 , is_duplicate /2 ,
2120 start /2 , stop /1 , zip_msgs_and_acks /4 , handle_info /2 ]).
2221
22+ % % Removed in 4.3. @todo Remove in next LTS.
23+ -export ([set_queue_mode /2 , set_queue_version /2 ]).
24+
2325% %----------------------------------------------------------------------------
2426% % This test backing queue follows the variable queue implementation, with
2527% % the exception that it will introduce infinite delays on some operations if
3133-behaviour (rabbit_backing_queue ).
3234
3335-record (vqstate ,
34- { q1 ,
35- q2 ,
36- delta ,
37- q3 ,
38- q4 ,
36+ { q_head ,
37+ q_tail ,
3938 next_seq_id ,
4039 % % seq_id() of first undelivered message
4140 % % everything before this seq_id() was delivered at least once
4241 next_deliver_seq_id ,
43- ram_pending_ack , % % msgs using store, still in RAM
42+ ram_pending_ack , % % msgs still in RAM
4443 disk_pending_ack , % % msgs in store, paged out
45- qi_pending_ack , % % msgs using qi, *can't* be paged out
46- index_mod ,
4744 index_state ,
4845 store_state ,
4946 msg_store_clients ,
5047 durable ,
5148 transient_threshold ,
5249 qi_embed_msgs_below ,
5350
54- len , % % w/o unacked
5551 bytes , % % w/o unacked
5652 unacked_bytes ,
5753 persistent_count , % % w unacked
5854 persistent_bytes , % % w unacked
59- delta_transient_bytes , % %
6055
61- target_ram_count ,
6256 ram_msg_count , % % w/o unacked
6357 ram_msg_count_prev ,
6458 ram_ack_count_prev ,
6559 ram_bytes , % % w unacked
6660 out_counter ,
6761 in_counter ,
6862 rates ,
63+ % % There are two confirms paths: either store/index produce confirms
64+ % % separately (v2 with per-vhost message store) or the confirms
65+ % % are produced all at once while syncing/flushing (v2 with per-queue
66+ % % message store). The latter is more efficient as it avoids many
67+ % % sets operations.
6968 msgs_on_disk ,
7069 msg_indices_on_disk ,
7170 unconfirmed ,
7776 disk_read_count ,
7877 disk_write_count ,
7978
80- io_batch_size ,
81-
82- % % default queue (or lazy queue from 3.6 to 3.11)
83- mode ,
84- version = 1 ,
8579 % % Fast path for confirms handling. Instead of having
8680 % % index/store keep track of confirms separately and
8781 % % doing intersect/subtract/union we just put the messages
109103 msg_props
110104 }).
111105
112- -record (delta ,
106+ -record (q_tail ,
113107 { start_seq_id , % % start_seq_id is inclusive
114108 count ,
115- transient ,
116109 end_seq_id % % end_seq_id is exclusive
117110 }).
118111
131124 ack_out :: float (),
132125 timestamp :: rabbit_types :timestamp ()}.
133126
134- -type delta () :: # delta { start_seq_id :: non_neg_integer (),
135- count :: non_neg_integer (),
136- end_seq_id :: non_neg_integer () }.
127+ -type q_tail () :: # q_tail { start_seq_id :: non_neg_integer (),
128+ count :: non_neg_integer (),
129+ end_seq_id :: non_neg_integer () }.
137130
138131% % The compiler (rightfully) complains that ack() and state() are
139132% % unused. For this reason we duplicate a -spec from
143136% % these here for documentation purposes.
144137-type ack () :: seq_id ().
145138-type state () :: # vqstate {
146- q1 :: ? QUEUE :? QUEUE (),
147- q2 :: ? QUEUE :? QUEUE (),
148- delta :: delta (),
149- q3 :: ? QUEUE :? QUEUE (),
150- q4 :: ? QUEUE :? QUEUE (),
139+ q_head :: ? QUEUE :? QUEUE (),
140+ q_tail :: q_tail (),
151141 next_seq_id :: seq_id (),
142+ next_deliver_seq_id :: seq_id (),
152143 ram_pending_ack :: map (),
153144 disk_pending_ack :: map (),
154- qi_pending_ack :: map (),
155145 index_state :: any (),
146+ store_state :: any (),
156147 msg_store_clients :: 'undefined' | {{any (), binary ()},
157148 {any (), binary ()}},
158149 durable :: boolean (),
159150 transient_threshold :: non_neg_integer (),
160151 qi_embed_msgs_below :: non_neg_integer (),
161152
162- len :: non_neg_integer (),
163153 bytes :: non_neg_integer (),
164154 unacked_bytes :: non_neg_integer (),
165-
166155 persistent_count :: non_neg_integer (),
167156 persistent_bytes :: non_neg_integer (),
168157
169- target_ram_count :: non_neg_integer () | 'infinity' ,
170158 ram_msg_count :: non_neg_integer (),
171159 ram_msg_count_prev :: non_neg_integer (),
172160 ram_ack_count_prev :: non_neg_integer (),
183171 disk_read_count :: non_neg_integer (),
184172 disk_write_count :: non_neg_integer (),
185173
186- io_batch_size :: pos_integer (),
187- mode :: 'default' | 'lazy' ,
188- virtual_host :: rabbit_types :vhost () }.
174+ unconfirmed_simple :: sets :set ()}.
175+
189176% % Duplicated from rabbit_backing_queue
190177-spec ack ([ack ()], state ()) -> {[rabbit_guid :guid ()], state ()}.
191178
@@ -214,9 +201,9 @@ delete_crashed(Q) ->
214201purge (State = # vqstate { ram_pending_ack = QPA }) ->
215202 maybe_delay (QPA ),
216203 rabbit_variable_queue :purge (State );
217- % % For v3.9.x and below because the state has changed.
204+ % % For v4.2.x because the state has changed.
218205purge (State ) ->
219- QPA = element (10 , State ),
206+ QPA = element (9 , State ),
220207 maybe_delay (QPA ),
221208 rabbit_variable_queue :purge (State ).
222209
@@ -252,9 +239,9 @@ ack(List, State) ->
252239requeue (AckTags , # vqstate { ram_pending_ack = QPA } = State ) ->
253240 maybe_delay (QPA ),
254241 rabbit_variable_queue :requeue (AckTags , State );
255- % % For v3.9.x and below because the state has changed.
242+ % % For v4.2.x because the state has changed.
256243requeue (AckTags , State ) ->
257- QPA = element (10 , State ),
244+ QPA = element (9 , State ),
258245 maybe_delay (QPA ),
259246 rabbit_variable_queue :requeue (AckTags , State ).
260247
@@ -264,9 +251,9 @@ ackfold(MsgFun, Acc, State, AckTags) ->
264251len (# vqstate { ram_pending_ack = QPA } = State ) ->
265252 maybe_delay (QPA ),
266253 rabbit_variable_queue :len (State );
267- % % For v3.9.x and below because the state has changed.
254+ % % For v4.2.x because the state has changed.
268255len (State ) ->
269- QPA = element (10 , State ),
256+ QPA = element (9 , State ),
270257 maybe_delay (QPA ),
271258 rabbit_variable_queue :len (State ).
272259
@@ -302,15 +289,15 @@ invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State).
302289
303290is_duplicate (Msg , State ) -> rabbit_variable_queue :is_duplicate (Msg , State ).
304291
305- set_queue_mode (Mode , State ) ->
306- rabbit_variable_queue :set_queue_mode (Mode , State ).
307-
308- set_queue_version (Version , State ) ->
309- rabbit_variable_queue :set_queue_version (Version , State ).
310-
311292zip_msgs_and_acks (Msgs , AckTags , Accumulator , State ) ->
312293 rabbit_variable_queue :zip_msgs_and_acks (Msgs , AckTags , Accumulator , State ).
313294
295+ set_queue_mode (_ , State ) ->
296+ State .
297+
298+ set_queue_version (_ , State ) ->
299+ State .
300+
314301% % Delay
315302maybe_delay (QPA ) ->
316303 % % The structure for ram_pending_acks has changed to maps in 3.12.
0 commit comments