diff --git a/src/queue/runner.rs b/src/queue/runner.rs index 3fddba9..2b40e2d 100644 --- a/src/queue/runner.rs +++ b/src/queue/runner.rs @@ -31,6 +31,12 @@ pub fn spawn( "Merge queue runner started" ); + match service::recover_orphaned(&db).await { + Ok(0) => {} + Ok(n) => tracing::info!(count = n, "Recovered orphaned PRs back to queued"), + Err(e) => tracing::error!(error = %e, "Failed to recover orphaned PRs"), + } + loop { ticker.tick().await; diff --git a/src/queue/service.rs b/src/queue/service.rs index 6d92ad9..68cfbcb 100644 --- a/src/queue/service.rs +++ b/src/queue/service.rs @@ -13,7 +13,7 @@ use crate::entity::pull_request::{ use crate::github::types::GhPullRequest; use crate::types::{BatchStatus, PrStatus}; -/// Find a PR currently in the queue. +/// Find a PR currently in the queue (queued status only). pub async fn find_queued_pr( db: &Db, owner: &str, @@ -30,6 +30,28 @@ pub async fn find_queued_pr( .map_err(|e| DbError(e).into_api_error()) } +/// Find a PR in any active status (queued, testing, batched). +pub async fn find_active_pr( + db: &Db, + owner: &str, + repo: &str, + pr_number: i32, +) -> std::result::Result, Error> { + let active_statuses = vec![ + PrStatus::Queued.to_string(), + PrStatus::Testing.to_string(), + PrStatus::Batched.to_string(), + ]; + PullRequest::find() + .filter(PrColumn::RepoOwner.eq(owner)) + .filter(PrColumn::RepoName.eq(repo)) + .filter(PrColumn::PrNumber.eq(pr_number)) + .filter(PrColumn::Status.is_in(active_statuses)) + .one(db.conn()) + .await + .map_err(|e| DbError(e).into_api_error()) +} + /// Add a PR to the merge queue. No-op if already queued. pub async fn enqueue( db: &Db, @@ -68,13 +90,14 @@ pub async fn enqueue( } /// Remove a PR from the queue by marking it as cancelled. +/// Works on any active status (queued, testing, batched). pub async fn dequeue( db: &Db, owner: &str, repo: &str, pr_number: i32, ) -> std::result::Result<(), Error> { - let existing = find_queued_pr(db, owner, repo, pr_number).await?; + let existing = find_active_pr(db, owner, repo, pr_number).await?; if let Some(pr) = existing { let mut active = pr.into_active_model(); @@ -228,6 +251,29 @@ pub async fn update_batch_status( Ok(()) } +/// Recover orphaned PRs stuck in testing/batched back to queued. +/// Called on runner startup to handle interrupted merges. +pub async fn recover_orphaned(db: &Db) -> std::result::Result { + let stuck_statuses = vec![PrStatus::Testing.to_string(), PrStatus::Batched.to_string()]; + let stuck = PullRequest::find() + .filter(PrColumn::Status.is_in(stuck_statuses)) + .all(db.conn()) + .await + .map_err(|e| DbError(e).into_api_error())?; + + let count = stuck.len(); + for pr in stuck { + let mut active = pr.into_active_model(); + active.status = Set(PrStatus::Queued.to_string()); + active + .update(db.conn()) + .await + .map_err(|e| DbError(e).into_api_error())?; + } + + Ok(count) +} + /// Write an entry to the merge_events audit log. pub async fn log_event( db: &Db,