Skip to content

Commit 7ac2496

Browse files
authored
Merge pull request #482 from Kobzol/api-queue
Add simple API JSON endpoint for merge queue pull requests
2 parents 04b6f5d + 230f5b4 commit 7ac2496

File tree

8 files changed

+372
-212
lines changed

8 files changed

+372
-212
lines changed

src/bin/bors.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use std::sync::Arc;
55
use std::time::Duration;
66

77
use anyhow::Context;
8+
use bors::server::{OAuthConfig, ServerState, create_app};
89
use bors::{
9-
BorsContext, BorsGlobalEvent, BorsProcess, CommandParser, OAuthConfig, PgDbClient, ServerState,
10-
TeamApiClient, TreeState, WebhookSecret, create_app, create_bors_process, create_github_client,
11-
load_repositories,
10+
BorsContext, BorsGlobalEvent, BorsProcess, CommandParser, PgDbClient, TeamApiClient, TreeState,
11+
WebhookSecret, create_bors_process, create_github_client, load_repositories,
1212
};
1313
use clap::Parser;
1414
use sqlx::postgres::PgConnectOptions;

src/github/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ use url::Url;
99
pub mod api;
1010
mod error;
1111
mod labels;
12-
mod rollup;
13-
pub mod server;
14-
mod webhook;
12+
pub mod process;
13+
pub mod rollup;
1514

15+
pub use crate::server::webhook::WebhookSecret;
1616
pub use api::operations::{MergeResult, attempt_merge};
1717
pub use error::AppError;
1818
pub use labels::{LabelModification, LabelTrigger};
19-
pub use webhook::WebhookSecret;
2019

2120
use crate::bors::PullRequestStatus;
2221

src/github/process.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use crate::bors::merge_queue::{MergeQueueSender, start_merge_queue};
2+
use crate::bors::mergeability_queue::{
3+
MergeabilityQueueReceiver, MergeabilityQueueSender, check_mergeability,
4+
create_mergeability_queue,
5+
};
6+
use crate::bors::{handle_bors_global_event, handle_bors_repository_event};
7+
use crate::{BorsContext, BorsGlobalEvent, BorsRepositoryEvent, TeamApiClient};
8+
use anyhow::Error;
9+
use octocrab::Octocrab;
10+
use std::pin::Pin;
11+
use std::sync::Arc;
12+
use tokio::sync::mpsc;
13+
use tracing::{Instrument, Span};
14+
15+
pub struct BorsProcess {
16+
pub repository_tx: mpsc::Sender<BorsRepositoryEvent>,
17+
pub global_tx: mpsc::Sender<BorsGlobalEvent>,
18+
pub merge_queue_tx: MergeQueueSender,
19+
pub mergeability_queue_tx: MergeabilityQueueSender,
20+
pub bors_process: Pin<Box<dyn Future<Output = ()> + Send>>,
21+
}
22+
23+
/// Creates a future with a Bors process that continuously receives webhook events and reacts to
24+
/// them.
25+
pub fn create_bors_process(
26+
ctx: BorsContext,
27+
gh_client: Octocrab,
28+
team_api: TeamApiClient,
29+
merge_queue_max_interval: chrono::Duration,
30+
) -> BorsProcess {
31+
let (repository_tx, repository_rx) = mpsc::channel::<BorsRepositoryEvent>(1024);
32+
let (global_tx, global_rx) = mpsc::channel::<BorsGlobalEvent>(1024);
33+
let (mergeability_queue_tx, mergeability_queue_rx) = create_mergeability_queue();
34+
let mergeability_queue_tx2 = mergeability_queue_tx.clone();
35+
36+
let ctx = Arc::new(ctx);
37+
38+
let (merge_queue_tx, merge_queue_fut) =
39+
start_merge_queue(ctx.clone(), merge_queue_max_interval);
40+
let merge_queue_tx2 = merge_queue_tx.clone();
41+
42+
let service = async move {
43+
// In tests, we shutdown these futures by dropping the channel sender,
44+
// In that case, we need to wait until both of these futures resolve,
45+
// to make sure that they are able to handle all the events in the queue
46+
// before finishing.
47+
#[cfg(test)]
48+
{
49+
let _ = tokio::join!(
50+
consume_repository_events(
51+
ctx.clone(),
52+
repository_rx,
53+
mergeability_queue_tx2.clone(),
54+
merge_queue_tx2.clone()
55+
),
56+
consume_global_events(
57+
ctx.clone(),
58+
global_rx,
59+
mergeability_queue_tx2,
60+
merge_queue_tx2,
61+
gh_client,
62+
team_api
63+
),
64+
consume_mergeability_queue(ctx.clone(), mergeability_queue_rx),
65+
merge_queue_fut
66+
);
67+
}
68+
// In real execution, the bot runs forever. If there is something that finishes
69+
// the futures early, it's essentially a bug.
70+
// FIXME: maybe we could just use join for both versions of the code.
71+
#[cfg(not(test))]
72+
{
73+
tokio::select! {
74+
_ = consume_repository_events(ctx.clone(), repository_rx, mergeability_queue_tx2.clone(), merge_queue_tx2.clone()) => {
75+
tracing::error!("Repository event handling process has ended");
76+
}
77+
_ = consume_global_events(ctx.clone(), global_rx, mergeability_queue_tx2, merge_queue_tx2, gh_client, team_api) => {
78+
tracing::error!("Global event handling process has ended");
79+
}
80+
_ = consume_mergeability_queue(ctx.clone(), mergeability_queue_rx) => {
81+
tracing::error!("Mergeability queue handling process has ended")
82+
}
83+
_ = merge_queue_fut => {
84+
tracing::error!("Merge queue handling process has ended");
85+
}
86+
}
87+
}
88+
};
89+
90+
BorsProcess {
91+
repository_tx,
92+
global_tx,
93+
mergeability_queue_tx,
94+
merge_queue_tx,
95+
bors_process: Box::pin(service),
96+
}
97+
}
98+
99+
async fn consume_repository_events(
100+
ctx: Arc<BorsContext>,
101+
mut repository_rx: mpsc::Receiver<BorsRepositoryEvent>,
102+
mergeability_queue_tx: MergeabilityQueueSender,
103+
merge_queue_tx: MergeQueueSender,
104+
) {
105+
while let Some(event) = repository_rx.recv().await {
106+
let ctx = ctx.clone();
107+
let mergeability_queue_tx = mergeability_queue_tx.clone();
108+
109+
let span = tracing::info_span!("RepositoryEvent");
110+
tracing::debug!("Received repository event: {event:?}");
111+
if let Err(error) =
112+
handle_bors_repository_event(event, ctx, mergeability_queue_tx, merge_queue_tx.clone())
113+
.instrument(span.clone())
114+
.await
115+
{
116+
handle_root_error(span, error);
117+
}
118+
}
119+
}
120+
121+
async fn consume_global_events(
122+
ctx: Arc<BorsContext>,
123+
mut global_rx: mpsc::Receiver<BorsGlobalEvent>,
124+
mergeability_queue_tx: MergeabilityQueueSender,
125+
merge_queue_tx: MergeQueueSender,
126+
gh_client: Octocrab,
127+
team_api: TeamApiClient,
128+
) {
129+
while let Some(event) = global_rx.recv().await {
130+
let ctx = ctx.clone();
131+
let mergeability_queue_tx = mergeability_queue_tx.clone();
132+
let merge_queue_tx = merge_queue_tx.clone();
133+
134+
let span = tracing::info_span!("GlobalEvent");
135+
tracing::trace!("Received global event: {event:?}");
136+
if let Err(error) = handle_bors_global_event(
137+
event,
138+
ctx,
139+
&gh_client,
140+
&team_api,
141+
mergeability_queue_tx,
142+
merge_queue_tx,
143+
)
144+
.instrument(span.clone())
145+
.await
146+
{
147+
handle_root_error(span, error);
148+
}
149+
}
150+
}
151+
152+
async fn consume_mergeability_queue(
153+
ctx: Arc<BorsContext>,
154+
mergeability_queue_rx: MergeabilityQueueReceiver,
155+
) {
156+
while let Some((mq_item, mq_tx)) = mergeability_queue_rx.dequeue().await {
157+
let ctx = ctx.clone();
158+
159+
let span = tracing::debug_span!(
160+
"Mergeability check",
161+
repo = mq_item.pull_request.repo.to_string(),
162+
pr = mq_item.pull_request.pr_number.0,
163+
attempt = mq_item.attempt
164+
);
165+
if let Err(error) = check_mergeability(ctx, mq_tx, mq_item)
166+
.instrument(span.clone())
167+
.await
168+
{
169+
handle_root_error(span, error);
170+
}
171+
}
172+
}
173+
174+
#[allow(unused_variables)]
175+
fn handle_root_error(span: Span, error: Error) {
176+
// In tests, we want to panic on all errors.
177+
#[cfg(test)]
178+
{
179+
panic!("Handler failed: {error:?}");
180+
}
181+
#[cfg(not(test))]
182+
{
183+
use crate::utils::logging::LogError;
184+
span.log_error(error);
185+
}
186+
}

src/github/rollup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::GithubRepoName;
22
use super::error::AppError;
3-
use super::server::ServerStateRef;
3+
use crate::server::ServerStateRef;
44
use anyhow::Context;
55
use axum::extract::{Query, State};
66
use axum::http::StatusCode;

src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod config;
66
mod database;
77
mod github;
88
mod permissions;
9+
pub mod server;
910
mod templates;
1011
mod utils;
1112

@@ -15,9 +16,12 @@ pub use github::{
1516
AppError, WebhookSecret,
1617
api::create_github_client,
1718
api::load_repositories,
18-
server::{BorsProcess, OAuthConfig, ServerState, create_app, create_bors_process},
19+
process::{BorsProcess, create_bors_process},
1920
};
2021
pub use permissions::TeamApiClient;
22+
pub use server::OAuthConfig;
23+
pub use server::ServerState;
24+
pub use server::create_app;
2125

2226
#[cfg(test)]
2327
mod tests;

0 commit comments

Comments
 (0)