@@ -53,7 +53,7 @@ enum QueueMessage {
5353 Item ( MergeabilityQueueItem ) ,
5454}
5555
56- #[ derive( PartialEq , Eq ) ]
56+ #[ derive( PartialEq , Eq , Debug ) ]
5757struct Item {
5858 /// When to process item (None = immediate is ordered before Some).
5959 expiration : Option < Instant > ,
@@ -152,6 +152,19 @@ impl MergeabilityQueueSender {
152152 fn insert_item ( & self , item : MergeabilityQueueItem , expiration : Option < Instant > ) {
153153 let mut queue = self . inner . queue . lock ( ) . unwrap ( ) ;
154154
155+ // Make sure that we don't ever put the same pull request twice into the queue
156+ // This might seem a bit inefficient, but linearly iterating through e.g. 1000 PRs should
157+ // be fine.
158+ // We could maybe reset the attempt counter of the PR if it's "refreshed" from the outside,
159+ // but that would require using e.g. Cell to mutate the attempt counter through &, which
160+ // doesn't seem necessary at the moment.
161+ if queue. iter ( ) . any ( |entry| match & entry. 0 . inner {
162+ QueueMessage :: Shutdown => false ,
163+ QueueMessage :: Item ( i) => i. pull_request == item. pull_request ,
164+ } ) {
165+ return ;
166+ }
167+
155168 // Notify when:
156169 // 1. The current item expires sooner than the head of the queue
157170 let has_earlier_expiration =
@@ -311,11 +324,10 @@ pub async fn check_mergeability(
311324#[ cfg( test) ]
312325mod tests {
313326 use crate :: bors:: mergeability_queue:: {
314- MergeabilityQueueItem , QueuedPullRequest , create_mergeability_queue,
327+ BASE_DELAY , MergeabilityQueueItem , QueuedPullRequest , create_mergeability_queue,
315328 } ;
316329 use crate :: github:: PullRequestNumber ;
317330 use crate :: tests:: default_repo_name;
318- use std:: time:: Duration ;
319331
320332 #[ tokio:: test]
321333 async fn order_by_pr_number ( ) {
@@ -335,12 +347,10 @@ mod tests {
335347 #[ tokio:: test]
336348 async fn immediate_before_delayed ( ) {
337349 let ( tx, rx) = create_mergeability_queue ( ) ;
338- tx. enqueue_retry_later ( item ( 5 , 1 ) ) ;
339- tx. enqueue_pr ( default_repo_name ( ) , 1u64 . into ( ) ) ;
340- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
341350 tx. enqueue_retry_later ( item ( 10 , 1 ) ) ;
351+ tx. enqueue_pr ( default_repo_name ( ) , 2u64 . into ( ) ) ;
342352
343- for expected in [ 1 , 5 , 10 ] {
353+ for expected in [ 2 , 10 ] {
344354 assert_eq ! (
345355 rx. dequeue( ) . await . unwrap( ) . 0 . pull_request. pr_number. 0 ,
346356 expected
@@ -349,17 +359,16 @@ mod tests {
349359 }
350360
351361 #[ tokio:: test]
352- async fn duplicated_pr ( ) {
362+ async fn deduplicate_duplicated_pr ( ) {
353363 let ( tx, rx) = create_mergeability_queue ( ) ;
354- // Handle duplicated PRs, still keep ordering by time
355- tx. enqueue_retry_later ( item ( 5 , 1 ) ) ; // this attempt will be bumped to 2
364+ // Make sure that we don't handle the same PR multiple times
365+ tx. enqueue_retry_later ( item ( 5 , 1 ) ) ;
366+ tx. enqueue_pr ( default_repo_name ( ) , 5u64 . into ( ) ) ;
356367 tx. enqueue_pr ( default_repo_name ( ) , 5u64 . into ( ) ) ;
357368
358- for ( pr, attempt) in [ ( 5 , 1 ) , ( 5 , 2 ) ] {
359- let item = rx. dequeue ( ) . await . unwrap ( ) . 0 ;
360- assert_eq ! ( item. pull_request. pr_number. 0 , pr) ;
361- assert_eq ! ( item. attempt, attempt) ;
362- }
369+ rx. dequeue ( ) . await . unwrap ( ) ;
370+ let res = tokio:: time:: timeout ( BASE_DELAY * 2 , rx. dequeue ( ) ) . await ;
371+ assert ! ( res. is_err( ) ) ;
363372 }
364373
365374 fn item ( pr_number : u64 , attempt : u32 ) -> MergeabilityQueueItem {
0 commit comments