@@ -53,7 +53,7 @@ enum QueueMessage {
5353 Item ( MergeabilityQueueItem ) ,
5454}
5555
56- #[ derive( PartialEq , Eq ) ]
56+ #[ derive( PartialEq , Eq , Debug ) ]
5757struct Item {
5858 /// When to process item (None = immediate is ordered before Some).
5959 expiration : Option < Instant > ,
@@ -152,6 +152,19 @@ impl MergeabilityQueueSender {
152152 fn insert_item ( & self , item : MergeabilityQueueItem , expiration : Option < Instant > ) {
153153 let mut queue = self . inner . queue . lock ( ) . unwrap ( ) ;
154154
155+ // Make sure that we don't ever put the same pull request twice into the queue
156+ // This might seem a bit inefficient, but linearly iterating through e.g. 1000 PRs should
157+ // be fine.
158+ // We could maybe reset the attempt counter of the PR if it's "refreshed" from the outside,
159+ // but that would require using e.g. Cell to mutate the attempt counter through &, which
160+ // doesn't seem necessary at the moment.
161+ if queue. iter ( ) . any ( |entry| match & entry. 0 . inner {
162+ QueueMessage :: Shutdown => false ,
163+ QueueMessage :: Item ( i) => i. pull_request == item. pull_request ,
164+ } ) {
165+ return ;
166+ }
167+
155168 // Notify when:
156169 // 1. The current item expires sooner than the head of the queue
157170 let has_earlier_expiration =
@@ -297,19 +310,12 @@ pub async fn check_mergeability(
297310 }
298311
299312 // We know the mergeability status, so update it in the DB
300- let pr_model = match ctx
301- . db
302- . get_pull_request ( & pull_request. repo , pull_request. pr_number )
303- . await ?
304- {
305- Some ( model) => model,
306- None => {
307- return Err ( anyhow:: anyhow!( "PR not found in database: {pull_request}" ) ) ;
308- }
309- } ;
310-
311313 ctx. db
312- . update_pr_mergeable_state ( & pr_model, new_mergeable_state. clone ( ) . into ( ) )
314+ . set_pr_mergeable_state (
315+ repo_state. repository ( ) ,
316+ fetched_pr. number ,
317+ new_mergeable_state. into ( ) ,
318+ )
313319 . await ?;
314320
315321 Ok ( ( ) )
@@ -318,11 +324,10 @@ pub async fn check_mergeability(
318324#[ cfg( test) ]
319325mod tests {
320326 use crate :: bors:: mergeability_queue:: {
321- MergeabilityQueueItem , QueuedPullRequest , create_mergeability_queue,
327+ BASE_DELAY , MergeabilityQueueItem , QueuedPullRequest , create_mergeability_queue,
322328 } ;
323329 use crate :: github:: PullRequestNumber ;
324330 use crate :: tests:: default_repo_name;
325- use std:: time:: Duration ;
326331
327332 #[ tokio:: test]
328333 async fn order_by_pr_number ( ) {
@@ -342,12 +347,10 @@ mod tests {
342347 #[ tokio:: test]
343348 async fn immediate_before_delayed ( ) {
344349 let ( tx, rx) = create_mergeability_queue ( ) ;
345- tx. enqueue_retry_later ( item ( 5 , 1 ) ) ;
346- tx. enqueue_pr ( default_repo_name ( ) , 1u64 . into ( ) ) ;
347- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
348350 tx. enqueue_retry_later ( item ( 10 , 1 ) ) ;
351+ tx. enqueue_pr ( default_repo_name ( ) , 2u64 . into ( ) ) ;
349352
350- for expected in [ 1 , 5 , 10 ] {
353+ for expected in [ 2 , 10 ] {
351354 assert_eq ! (
352355 rx. dequeue( ) . await . unwrap( ) . 0 . pull_request. pr_number. 0 ,
353356 expected
@@ -356,17 +359,16 @@ mod tests {
356359 }
357360
358361 #[ tokio:: test]
359- async fn duplicated_pr ( ) {
362+ async fn deduplicate_duplicated_pr ( ) {
360363 let ( tx, rx) = create_mergeability_queue ( ) ;
361- // Handle duplicated PRs, still keep ordering by time
362- tx. enqueue_retry_later ( item ( 5 , 1 ) ) ; // this attempt will be bumped to 2
364+ // Make sure that we don't handle the same PR multiple times
365+ tx. enqueue_retry_later ( item ( 5 , 1 ) ) ;
366+ tx. enqueue_pr ( default_repo_name ( ) , 5u64 . into ( ) ) ;
363367 tx. enqueue_pr ( default_repo_name ( ) , 5u64 . into ( ) ) ;
364368
365- for ( pr, attempt) in [ ( 5 , 1 ) , ( 5 , 2 ) ] {
366- let item = rx. dequeue ( ) . await . unwrap ( ) . 0 ;
367- assert_eq ! ( item. pull_request. pr_number. 0 , pr) ;
368- assert_eq ! ( item. attempt, attempt) ;
369- }
369+ rx. dequeue ( ) . await . unwrap ( ) ;
370+ let res = tokio:: time:: timeout ( BASE_DELAY * 2 , rx. dequeue ( ) ) . await ;
371+ assert ! ( res. is_err( ) ) ;
370372 }
371373
372374 fn item ( pr_number : u64 , attempt : u32 ) -> MergeabilityQueueItem {
0 commit comments