Skip to content

Commit dc9084a

Browse files
committed
fix(guard): handle actor stopped during ws/req
1 parent 00cd5f2 commit dc9084a

File tree

3 files changed

+58
-21
lines changed

3 files changed

+58
-21
lines changed

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ impl CustomServeTrait for PegboardGateway {
152152
.context("failed to read body")?
153153
.to_bytes();
154154

155+
let mut stopped_sub = self
156+
.ctx
157+
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
158+
.await?;
159+
155160
// Build subject to publish to
156161
let tunnel_subject =
157162
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
@@ -212,6 +217,10 @@ impl CustomServeTrait for PegboardGateway {
212217
break;
213218
}
214219
}
220+
_ = stopped_sub.next() => {
221+
tracing::debug!("actor stopped while waiting for request response");
222+
return Err(ServiceUnavailable.build());
223+
}
215224
_ = drop_rx.changed() => {
216225
tracing::warn!("tunnel message timeout");
217226
return Err(ServiceUnavailable.build());
@@ -278,6 +287,11 @@ impl CustomServeTrait for PegboardGateway {
278287
}
279288
}
280289

290+
let mut stopped_sub = self
291+
.ctx
292+
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
293+
.await?;
294+
281295
// Build subject to publish to
282296
let tunnel_subject =
283297
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
@@ -339,6 +353,10 @@ impl CustomServeTrait for PegboardGateway {
339353
break;
340354
}
341355
}
356+
_ = stopped_sub.next() => {
357+
tracing::debug!("actor stopped while waiting for websocket open");
358+
return Err(WebSocketServiceUnavailable.build());
359+
}
342360
_ = drop_rx.changed() => {
343361
tracing::warn!("websocket open timeout");
344362
return Err(WebSocketServiceUnavailable.build());
@@ -364,7 +382,7 @@ impl CustomServeTrait for PegboardGateway {
364382
open_msg.can_hibernate
365383
};
366384

367-
// Send reclaimed messages
385+
// Send pending messages
368386
self.shared_state
369387
.resend_pending_websocket_messages(request_id)
370388
.await?;
@@ -415,6 +433,15 @@ impl CustomServeTrait for PegboardGateway {
415433
return Err(WebSocketServiceHibernate.build());
416434
}
417435
}
436+
_ = stopped_sub.next() => {
437+
tracing::debug!("actor stopped during websocket handler loop");
438+
439+
if can_hibernate {
440+
return Err(WebSocketServiceHibernate.build());
441+
} else {
442+
return Err(WebSocketServiceUnavailable.build());
443+
}
444+
}
418445
_ = drop_rx.changed() => {
419446
tracing::warn!("websocket message timeout");
420447
return Err(WebSocketServiceTimeout.build());
@@ -579,6 +606,15 @@ impl CustomServeTrait for PegboardGateway {
579606
client_ws: WebSocketHandle,
580607
unique_request_id: Uuid,
581608
) -> Result<HibernationResult> {
609+
// Immediately rewake if we have pending messages
610+
if self
611+
.shared_state
612+
.has_pending_websocket_messages(unique_request_id.into_bytes())
613+
.await?
614+
{
615+
return Ok(HibernationResult::Continue);
616+
}
617+
582618
// Start keepalive task
583619
let ctx = self.ctx.clone();
584620
let actor_id = self.actor_id;

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,18 @@ impl SharedState {
344344
Ok(())
345345
}
346346

347+
pub async fn has_pending_websocket_messages(&self, request_id: RequestId) -> Result<bool> {
348+
let Some(req) = self.in_flight_requests.get_async(&request_id).await else {
349+
bail!("request not in flight");
350+
};
351+
352+
if let Some(hs) = &req.hibernation_state {
353+
Ok(!hs.pending_ws_msgs.is_empty())
354+
} else {
355+
Ok(false)
356+
}
357+
}
358+
347359
pub async fn ack_pending_websocket_messages(
348360
&self,
349361
request_id: RequestId,

engine/packages/pegboard-runner/src/lib.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use rivet_guard_core::{
1212
use rivet_runner_protocol as protocol;
1313
use std::time::Duration;
1414
use tokio::sync::watch;
15-
use tokio_tungstenite::tungstenite::protocol::frame::{CloseFrame, coding::CloseCode};
15+
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
1616
use universalpubsub::PublishOpts;
1717
use vbare::OwnedVersionedData;
1818

@@ -243,30 +243,19 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
243243
);
244244
}
245245

246-
// Send WebSocket close messages to all remaining active requests
246+
// Send close messages to all remaining active requests
247247
let active_requests = conn.tunnel_active_requests.lock().await;
248248
for (request_id, req) in &*active_requests {
249-
let close_msg_kind = if req.is_ws {
250-
let (close_code, close_reason) = if lifecycle_res.is_ok() {
251-
(CloseCode::Normal.into(), None)
252-
} else {
253-
(CloseCode::Error.into(), Some("ws.upstream_closed".into()))
254-
};
255-
256-
protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(
257-
protocol::ToServerWebSocketClose {
258-
code: Some(close_code),
259-
reason: close_reason,
260-
hibernate: true,
261-
},
262-
)
263-
} else {
264-
protocol::ToServerTunnelMessageKind::ToServerResponseAbort
265-
};
249+
// Websockets are not ephemeral like requests. If the runner ws closes they are not informed;
250+
// instead they wait for the actor itself to stop.
251+
if req.is_ws {
252+
continue;
253+
}
254+
266255
let close_message = protocol::ToServerTunnelMessage {
267256
request_id: request_id.clone(),
268257
message_id: Uuid::new_v4().into_bytes(),
269-
message_kind: close_msg_kind,
258+
message_kind: protocol::ToServerTunnelMessageKind::ToServerResponseAbort,
270259
};
271260

272261
let msg_serialized = protocol::versioned::ToGateway::wrap_latest(protocol::ToGateway {

0 commit comments

Comments
 (0)