Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/queue/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub fn spawn(
"Merge queue runner started"
);

match service::recover_orphaned(&db).await {
Ok(0) => {}
Ok(n) => tracing::info!(count = n, "Recovered orphaned PRs back to queued"),
Err(e) => tracing::error!(error = %e, "Failed to recover orphaned PRs"),
}

loop {
ticker.tick().await;

Expand Down
50 changes: 48 additions & 2 deletions src/queue/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::entity::pull_request::{
use crate::github::types::GhPullRequest;
use crate::types::{BatchStatus, PrStatus};

/// Find a PR currently in the queue.
/// Find a PR currently in the queue (queued status only).
pub async fn find_queued_pr(
db: &Db,
owner: &str,
Expand All @@ -30,6 +30,28 @@ pub async fn find_queued_pr(
.map_err(|e| DbError(e).into_api_error())
}

/// Find a PR in any active status (queued, testing, batched).
pub async fn find_active_pr(
db: &Db,
owner: &str,
repo: &str,
pr_number: i32,
) -> std::result::Result<Option<PrModel>, Error> {
let active_statuses = vec![
PrStatus::Queued.to_string(),
PrStatus::Testing.to_string(),
PrStatus::Batched.to_string(),
];
PullRequest::find()
.filter(PrColumn::RepoOwner.eq(owner))
.filter(PrColumn::RepoName.eq(repo))
.filter(PrColumn::PrNumber.eq(pr_number))
.filter(PrColumn::Status.is_in(active_statuses))
.one(db.conn())
.await
.map_err(|e| DbError(e).into_api_error())
}

/// Add a PR to the merge queue. No-op if already queued.
pub async fn enqueue(
db: &Db,
Expand Down Expand Up @@ -68,13 +90,14 @@ pub async fn enqueue(
}

/// Remove a PR from the queue by marking it as cancelled.
/// Works on any active status (queued, testing, batched).
pub async fn dequeue(
db: &Db,
owner: &str,
repo: &str,
pr_number: i32,
) -> std::result::Result<(), Error> {
let existing = find_queued_pr(db, owner, repo, pr_number).await?;
let existing = find_active_pr(db, owner, repo, pr_number).await?;

if let Some(pr) = existing {
let mut active = pr.into_active_model();
Expand Down Expand Up @@ -228,6 +251,29 @@ pub async fn update_batch_status(
Ok(())
}

/// Recover orphaned PRs stuck in testing/batched back to queued.
/// Called on runner startup to handle interrupted merges.
pub async fn recover_orphaned(db: &Db) -> std::result::Result<usize, Error> {
let stuck_statuses = vec![PrStatus::Testing.to_string(), PrStatus::Batched.to_string()];
let stuck = PullRequest::find()
.filter(PrColumn::Status.is_in(stuck_statuses))
.all(db.conn())
.await
.map_err(|e| DbError(e).into_api_error())?;

let count = stuck.len();
for pr in stuck {
let mut active = pr.into_active_model();
active.status = Set(PrStatus::Queued.to_string());
active
.update(db.conn())
.await
.map_err(|e| DbError(e).into_api_error())?;
}

Ok(count)
}

/// Write an entry to the merge_events audit log.
pub async fn log_event(
db: &Db,
Expand Down