1616 ackfold /4 , len /1 , is_empty /1 , depth /1 ,
1717 update_rates /1 , needs_timeout /1 , timeout /1 ,
1818 handle_pre_hibernate /1 , resume /1 , msg_rates /1 ,
19- info /2 , invoke /3 , is_duplicate /2 , set_queue_mode /2 ,
20- set_queue_version /2 ,
19+ info /2 , invoke /3 , is_duplicate /2 ,
2120 start /2 , stop /1 , zip_msgs_and_acks /4 , handle_info /2 ]).
2221
2322% %----------------------------------------------------------------------------
3130-behaviour (rabbit_backing_queue ).
3231
3332-record (vqstate ,
34- { q1 ,
35- q2 ,
36- delta ,
37- q3 ,
38- q4 ,
33+ { q_head ,
34+ q_tail ,
3935 next_seq_id ,
4036 % % seq_id() of first undelivered message
4137 % % everything before this seq_id() was delivered at least once
4238 next_deliver_seq_id ,
43- ram_pending_ack , % % msgs using store, still in RAM
39+ ram_pending_ack , % % msgs still in RAM
4440 disk_pending_ack , % % msgs in store, paged out
45- qi_pending_ack , % % msgs using qi, *can't* be paged out
46- index_mod ,
4741 index_state ,
4842 store_state ,
4943 msg_store_clients ,
5044 durable ,
5145 transient_threshold ,
5246 qi_embed_msgs_below ,
5347
54- len , % % w/o unacked
5548 bytes , % % w/o unacked
5649 unacked_bytes ,
5750 persistent_count , % % w unacked
5851 persistent_bytes , % % w unacked
59- delta_transient_bytes , % %
6052
61- target_ram_count ,
6253 ram_msg_count , % % w/o unacked
6354 ram_msg_count_prev ,
6455 ram_ack_count_prev ,
6556 ram_bytes , % % w unacked
6657 out_counter ,
6758 in_counter ,
6859 rates ,
60+ % % There are two confirms paths: either store/index produce confirms
61+ % % separately (v2 with per-vhost message store) or the confirms
62+ % % are produced all at once while syncing/flushing (v2 with per-queue
63+ % % message store). The latter is more efficient as it avoids many
64+ % % sets operations.
6965 msgs_on_disk ,
7066 msg_indices_on_disk ,
7167 unconfirmed ,
7773 disk_read_count ,
7874 disk_write_count ,
7975
80- io_batch_size ,
81-
82- % % default queue (or lazy queue from 3.6 to 3.11)
83- mode ,
84- version = 1 ,
8576 % % Fast path for confirms handling. Instead of having
8677 % % index/store keep track of confirms separately and
8778 % % doing intersect/subtract/union we just put the messages
109100 msg_props
110101 }).
111102
112- -record (delta ,
103+ -record (q_tail ,
113104 { start_seq_id , % % start_seq_id is inclusive
114105 count ,
115- transient ,
116106 end_seq_id % % end_seq_id is exclusive
117107 }).
118108
131121 ack_out :: float (),
132122 timestamp :: rabbit_types :timestamp ()}.
133123
134- -type delta () :: # delta { start_seq_id :: non_neg_integer (),
135- count :: non_neg_integer (),
136- end_seq_id :: non_neg_integer () }.
124+ -type q_tail () :: # q_tail { start_seq_id :: non_neg_integer (),
125+ count :: non_neg_integer (),
126+ end_seq_id :: non_neg_integer () }.
137127
138128% % The compiler (rightfully) complains that ack() and state() are
139129% % unused. For this reason we duplicate a -spec from
143133% % these here for documentation purposes.
144134-type ack () :: seq_id ().
145135-type state () :: # vqstate {
146- q1 :: ? QUEUE :? QUEUE (),
147- q2 :: ? QUEUE :? QUEUE (),
148- delta :: delta (),
149- q3 :: ? QUEUE :? QUEUE (),
150- q4 :: ? QUEUE :? QUEUE (),
136+ q_head :: ? QUEUE :? QUEUE (),
137+ q_tail :: q_tail (),
151138 next_seq_id :: seq_id (),
139+ next_deliver_seq_id :: seq_id (),
152140 ram_pending_ack :: map (),
153141 disk_pending_ack :: map (),
154- qi_pending_ack :: map (),
155142 index_state :: any (),
143+ store_state :: any (),
156144 msg_store_clients :: 'undefined' | {{any (), binary ()},
157145 {any (), binary ()}},
158146 durable :: boolean (),
159147 transient_threshold :: non_neg_integer (),
160148 qi_embed_msgs_below :: non_neg_integer (),
161149
162- len :: non_neg_integer (),
163150 bytes :: non_neg_integer (),
164151 unacked_bytes :: non_neg_integer (),
165-
166152 persistent_count :: non_neg_integer (),
167153 persistent_bytes :: non_neg_integer (),
168154
169- target_ram_count :: non_neg_integer () | 'infinity' ,
170155 ram_msg_count :: non_neg_integer (),
171156 ram_msg_count_prev :: non_neg_integer (),
172157 ram_ack_count_prev :: non_neg_integer (),
183168 disk_read_count :: non_neg_integer (),
184169 disk_write_count :: non_neg_integer (),
185170
186- io_batch_size :: pos_integer (),
187- mode :: 'default' | 'lazy' ,
188- virtual_host :: rabbit_types :vhost () }.
171+ unconfirmed_simple :: sets :set ()}.
172+
189173% % Duplicated from rabbit_backing_queue
190174-spec ack ([ack ()], state ()) -> {[rabbit_guid :guid ()], state ()}.
191175
@@ -214,9 +198,9 @@ delete_crashed(Q) ->
214198purge (State = # vqstate { ram_pending_ack = QPA }) ->
215199 maybe_delay (QPA ),
216200 rabbit_variable_queue :purge (State );
217- % % For v3.9.x and below because the state has changed.
201+ % % For v4.2.x because the state has changed.
218202purge (State ) ->
219- QPA = element (10 , State ),
203+ QPA = element (9 , State ),
220204 maybe_delay (QPA ),
221205 rabbit_variable_queue :purge (State ).
222206
@@ -252,9 +236,9 @@ ack(List, State) ->
252236requeue (AckTags , # vqstate { ram_pending_ack = QPA } = State ) ->
253237 maybe_delay (QPA ),
254238 rabbit_variable_queue :requeue (AckTags , State );
255- % % For v3.9.x and below because the state has changed.
239+ % % For v4.2.x because the state has changed.
256240requeue (AckTags , State ) ->
257- QPA = element (10 , State ),
241+ QPA = element (9 , State ),
258242 maybe_delay (QPA ),
259243 rabbit_variable_queue :requeue (AckTags , State ).
260244
@@ -264,9 +248,9 @@ ackfold(MsgFun, Acc, State, AckTags) ->
264248len (# vqstate { ram_pending_ack = QPA } = State ) ->
265249 maybe_delay (QPA ),
266250 rabbit_variable_queue :len (State );
267- % % For v3.9.x and below because the state has changed.
251+ % % For v4.2.x because the state has changed.
268252len (State ) ->
269- QPA = element (10 , State ),
253+ QPA = element (9 , State ),
270254 maybe_delay (QPA ),
271255 rabbit_variable_queue :len (State ).
272256
@@ -302,12 +286,6 @@ invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State).
302286
303287is_duplicate (Msg , State ) -> rabbit_variable_queue :is_duplicate (Msg , State ).
304288
305- set_queue_mode (Mode , State ) ->
306- rabbit_variable_queue :set_queue_mode (Mode , State ).
307-
308- set_queue_version (Version , State ) ->
309- rabbit_variable_queue :set_queue_version (Version , State ).
310-
311289zip_msgs_and_acks (Msgs , AckTags , Accumulator , State ) ->
312290 rabbit_variable_queue :zip_msgs_and_acks (Msgs , AckTags , Accumulator , State ).
313291
0 commit comments