Skip to content

Commit c425581

Browse files
authored
Merge pull request #1073 from rust-lang/log-on-aborted-request
Use a drop guard to log aborted futures
2 parents 95cd694 + 3792d39 commit c425581

File tree

3 files changed

+61
-40
lines changed

3 files changed

+61
-40
lines changed

ui/src/request_database.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl Handle {
174174
rx.await.context(RecvStartRequestSnafu)?.map_err(Into::into)
175175
}
176176

177-
pub async fn attempt_start_request(
177+
async fn attempt_start_request(
178178
&self,
179179
category: impl Into<String>,
180180
payload: impl Into<String>,
@@ -200,11 +200,43 @@ impl Handle {
200200
rx.await.context(RecvEndRequestSnafu)?.map_err(Into::into)
201201
}
202202

203-
pub async fn attempt_end_request(&self, id: Id, how: How) {
203+
async fn attempt_end_request(&self, id: Id, how: How) {
204204
if let Err(err) = self.end_request(id, how).await {
205205
warn!(?err, "Unable to record end request");
206206
}
207207
}
208+
209+
pub async fn start_with_guard(
210+
self,
211+
category: impl Into<String>,
212+
payload: impl Into<String>,
213+
) -> EndGuard {
214+
let g = self
215+
.attempt_start_request(category, payload)
216+
.await
217+
.map(|id| EndGuardInner(id, How::Abandoned, self));
218+
EndGuard(g)
219+
}
220+
}
221+
222+
pub struct EndGuard(Option<EndGuardInner>);
223+
224+
impl EndGuard {
225+
pub fn complete_now(mut self) {
226+
if let Some(mut inner) = self.0.take() {
227+
inner.1 = How::Complete;
228+
drop(inner);
229+
}
230+
}
231+
}
232+
233+
struct EndGuardInner(Id, How, Handle);
234+
235+
impl Drop for EndGuardInner {
236+
fn drop(&mut self) {
237+
let Self(id, how, ref handle) = *self;
238+
futures::executor::block_on(handle.attempt_end_request(id, how))
239+
}
208240
}
209241

210242
#[derive(Debug, Snafu)]

ui/src/server_axum.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
record_metric, track_metric_no_request_async, Endpoint, HasLabelsCore, Outcome,
55
UNAVAILABLE_WS,
66
},
7-
request_database::{Handle, How},
7+
request_database::Handle,
88
Config, GhToken, MetricsToken,
99
};
1010
use async_trait::async_trait;
@@ -198,13 +198,11 @@ where
198198
{
199199
let category = format!("http.{}", <&str>::from(T::ENDPOINT));
200200
let payload = serde_json::to_string(&req).unwrap_or_else(|_| String::from("<invalid JSON>"));
201-
let id = db.attempt_start_request(category, payload).await;
201+
let guard = db.start_with_guard(category, payload).await;
202202

203203
let r = f(req).await;
204204

205-
if let Some(id) = id {
206-
db.attempt_end_request(id, How::Complete).await;
207-
}
205+
guard.complete_now();
208206

209207
r
210208
}

ui/src/server_axum/websocket.rs

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
metrics::{self, record_metric, Endpoint, HasLabelsCore, Outcome},
3-
request_database::{Handle, How},
3+
request_database::Handle,
44
server_axum::api_orchestrator_integration_impls::*,
55
};
66

@@ -389,12 +389,6 @@ async fn handle_core(
389389
resp = rx.recv() => {
390390
let resp = resp.expect("The rx should never close as we have a tx");
391391

392-
if let Ok(MessageResponse::ExecuteEnd { meta, .. }) = &resp {
393-
if let Some((_, _, Some(db_id))) = active_executions.get(&meta.sequence_number) {
394-
db.attempt_end_request(*db_id, How::Complete).await;
395-
}
396-
}
397-
398392
let success = resp.is_ok();
399393
let resp = resp.unwrap_or_else(error_to_response);
400394
let resp = response_to_message(resp);
@@ -443,7 +437,7 @@ async fn handle_core(
443437
_ = active_execution_gc_interval.tick() => {
444438
active_executions = mem::take(&mut active_executions)
445439
.into_iter()
446-
.filter(|(_id, (_, tx, _))| tx.as_ref().map_or(false, |tx| !tx.is_closed()))
440+
.filter(|(_id, (_, tx))| tx.as_ref().map_or(false, |tx| !tx.is_closed()))
447441
.collect();
448442
},
449443

@@ -464,12 +458,6 @@ async fn handle_core(
464458
}
465459
}
466460

467-
for (_, (_, _, db_id)) in active_executions {
468-
if let Some(db_id) = db_id {
469-
db.attempt_end_request(db_id, How::Abandoned).await;
470-
}
471-
}
472-
473461
drop((tx, rx, socket));
474462
if let Err(e) = manager.shutdown().await {
475463
error!("Could not shut down the Coordinator: {e:?}");
@@ -516,11 +504,7 @@ fn response_to_message(response: MessageResponse) -> Message {
516504
Message::Text(resp)
517505
}
518506

519-
type ActiveExecutionInfo = (
520-
CancellationToken,
521-
Option<mpsc::Sender<String>>,
522-
Option<crate::request_database::Id>,
523-
);
507+
type ActiveExecutionInfo = (CancellationToken, Option<mpsc::Sender<String>>);
524508

525509
async fn handle_msg(
526510
txt: String,
@@ -538,22 +522,31 @@ async fn handle_msg(
538522
let token = CancellationToken::new();
539523
let (execution_tx, execution_rx) = mpsc::channel(8);
540524

541-
let id = db.attempt_start_request("ws.Execute", &txt).await;
525+
let guard = db.clone().start_with_guard("ws.Execute", &txt).await;
542526

543-
active_executions.insert(
544-
meta.sequence_number,
545-
(token.clone(), Some(execution_tx), id),
546-
);
527+
active_executions.insert(meta.sequence_number, (token.clone(), Some(execution_tx)));
547528

548529
// TODO: Should a single execute / build / etc. session have a timeout of some kind?
549530
let spawned = manager
550531
.spawn({
551532
let tx = tx.clone();
552533
let meta = meta.clone();
553-
|coordinator| {
554-
handle_execute(token, execution_rx, tx, coordinator, payload, meta.clone())
555-
.context(StreamingExecuteSnafu)
556-
.map_err(|e| (e, Some(meta)))
534+
|coordinator| async {
535+
let r = handle_execute(
536+
token,
537+
execution_rx,
538+
tx,
539+
coordinator,
540+
payload,
541+
meta.clone(),
542+
)
543+
.context(StreamingExecuteSnafu)
544+
.map_err(|e| (e, Some(meta)))
545+
.await;
546+
547+
guard.complete_now();
548+
549+
r
557550
}
558551
})
559552
.await
@@ -565,8 +558,7 @@ async fn handle_msg(
565558
}
566559

567560
Ok(ExecuteStdin { payload, meta }) => {
568-
let Some((_, Some(execution_tx), _)) = active_executions.get(&meta.sequence_number)
569-
else {
561+
let Some((_, Some(execution_tx))) = active_executions.get(&meta.sequence_number) else {
570562
warn!("Received stdin for an execution that is no longer active");
571563
return;
572564
};
@@ -582,8 +574,7 @@ async fn handle_msg(
582574
}
583575

584576
Ok(ExecuteStdinClose { meta }) => {
585-
let Some((_, execution_tx, _)) = active_executions.get_mut(&meta.sequence_number)
586-
else {
577+
let Some((_, execution_tx)) = active_executions.get_mut(&meta.sequence_number) else {
587578
warn!("Received stdin close for an execution that is no longer active");
588579
return;
589580
};
@@ -592,7 +583,7 @@ async fn handle_msg(
592583
}
593584

594585
Ok(ExecuteKill { meta }) => {
595-
let Some((token, _, _)) = active_executions.get(&meta.sequence_number) else {
586+
let Some((token, _)) = active_executions.get(&meta.sequence_number) else {
596587
warn!("Received kill for an execution that is no longer active");
597588
return;
598589
};

0 commit comments

Comments
 (0)