Skip to content

Commit 6d56229

Browse files
Drop msg on requeue if above delivery_limit (#996)
Drops messages directly on requeue if they have been redelivered more than x-delivery-limit times, instead of waiting for the next time we try to deliver them. Also drops messages when a new policy is applied and removes the check on delivery-count on delivery. Fixes #976
1 parent f300dce commit 6d56229

File tree

3 files changed

+48
-8
lines changed

3 files changed

+48
-8
lines changed

spec/policies_spec.cr

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,25 @@ describe LavinMQ::VHost do
204204
end
205205
end
206206

207+
it "should drop messages if above delivery-limit" do
208+
with_amqp_server do |s|
209+
defs = {"delivery-limit" => JSON::Any.new(0_i64)} of String => JSON::Any
210+
with_channel(s) do |ch|
211+
args = AMQP::Client::Arguments.new
212+
args["x-delivery-limit"] = 5
213+
q = ch.queue("delivery-limit", exclusive: true, args: args)
214+
q.publish_confirm "m1"
215+
q.publish_confirm "m2"
216+
q.get(no_ack: false).not_nil!.reject(requeue: true)
217+
ch.queue_declare("delivery-limit", passive: true)[:message_count].should eq 2
218+
s.vhosts["/"].add_policy("delivery-limit", "^.*$", "all", defs, 12_i8)
219+
sleep 10.milliseconds
220+
ch.queue_declare("delivery-limit", passive: true)[:message_count].should eq 1
221+
s.vhosts["/"].delete_policy("delivery-limit")
222+
end
223+
end
224+
end
225+
207226
describe "with max-length-bytes policy applied" do
208227
it "should replace with max-length" do
209228
with_amqp_server do |s|

spec/server_spec.cr

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -959,7 +959,7 @@ describe LavinMQ::Server do
959959
with_amqp_server do |s|
960960
with_channel(s) do |ch|
961961
args = AMQP::Client::Arguments.new
962-
args["x-delivery-limit"] = 2
962+
args["x-delivery-limit"] = 1
963963
q = ch.queue("delivery_limit", args: args)
964964
q.publish "m1"
965965
msg = q.get(no_ack: false).not_nil!
@@ -969,7 +969,6 @@ describe LavinMQ::Server do
969969
msg.properties.headers.not_nil!["x-delivery-count"].as(Int32).should eq 1
970970
msg.reject(requeue: true)
971971
Fiber.yield
972-
q.get(no_ack: false).should be_nil
973972
s.vhosts["/"].queues["delivery_limit"].empty?.should be_true
974973
end
975974
end

src/lavinmq/amqp/queue/queue.cr

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ module LavinMQ::AMQP
236236
when "delivery-limit"
237237
unless @delivery_limit.try &.< v.as_i64
238238
@delivery_limit = v.as_i64
239+
drop_redelivered
239240
end
240241
when "federation-upstream"
241242
@vhost.upstreams.try &.link(v.as_s, self)
@@ -514,6 +515,27 @@ module LavinMQ::AMQP
514515
end
515516
end
516517

518+
private def drop_redelivered : Nil
519+
counter = 0
520+
if limit = @delivery_limit
521+
@msg_store_lock.synchronize do
522+
loop do
523+
env = @msg_store.first? || break
524+
delivery_count = @deliveries.fetch(env.segment_position, 0) || break
525+
break unless delivery_count > limit
526+
env = @msg_store.shift? || break
527+
@log.debug { "Over delivery limit, drop sp=#{env.segment_position}" }
528+
expire_msg(env, :delivery_limit)
529+
counter &+= 1
530+
if counter >= 16 * 1024
531+
Fiber.yield
532+
counter = 0
533+
end
534+
end
535+
end
536+
end
537+
end
538+
517539
private def time_to_message_expiration : Time::Span?
518540
env = @msg_store_lock.synchronize { @msg_store.first? } || return
519541
@log.debug { "Checking if message #{env.message} has to be expired" }
@@ -789,15 +811,10 @@ module LavinMQ::AMQP
789811
end
790812

791813
private def with_delivery_count_header(env) : Envelope?
792-
if limit = @delivery_limit
814+
if @delivery_limit
793815
sp = env.segment_position
794816
headers = env.message.properties.headers || AMQP::Table.new
795817
delivery_count = @deliveries.fetch(sp, 0)
796-
# @log.debug { "Delivery count: #{delivery_count} Delivery limit: #{@delivery_limit}" }
797-
if delivery_count >= limit
798-
expire_msg(env, :delivery_limit)
799-
return nil
800-
end
801818
headers["x-delivery-count"] = delivery_count if delivery_count > 0 # x-delivery-count not included in first delivery
802819
@deliveries[sp] = delivery_count + 1
803820
env.message.properties.headers = headers
@@ -834,6 +851,11 @@ module LavinMQ::AMQP
834851
if has_expired?(sp, requeue: true) # guarantee to not deliver expired messages
835852
expire_msg(sp, :expired)
836853
else
854+
if delivery_limit = @delivery_limit
855+
if @deliveries.fetch(sp, 0) > delivery_limit
856+
return expire_msg(sp, :delivery_limit)
857+
end
858+
end
837859
@msg_store_lock.synchronize do
838860
@msg_store.requeue(sp)
839861
end

0 commit comments

Comments
 (0)