Skip to content

Commit f50891c

Browse files
NathanFlurryMasterPtato
authored andcommitted
chore(rivetkit): implement new hibernating ws protocol
1 parent 6d04024 commit f50891c

File tree

68 files changed

+4126
-3203
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+4126
-3203
lines changed

engine/artifacts/errors/actor.not_found.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/actor-kv/tests/list_edge_cases.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ async fn test_list_edge_cases() -> Result<()> {
6464
None,
6565
)
6666
.await?;
67-
assert_eq!(no_match.len(), 0, "should return empty for non-matching prefix");
67+
assert_eq!(
68+
no_match.len(),
69+
0,
70+
"should return empty for non-matching prefix"
71+
);
6872

6973
// Test 3: Range where start > end (should return empty)
7074
tracing::info!("test 3: range where start > end");
@@ -137,8 +141,7 @@ async fn test_list_edge_cases() -> Result<()> {
137141
)
138142
.await?;
139143

140-
let (null_keys, null_values, _) =
141-
kv::get(db, actor_id, vec![null_key.clone()]).await?;
144+
let (null_keys, null_values, _) = kv::get(db, actor_id, vec![null_key.clone()]).await?;
142145
assert_eq!(null_keys.len(), 1, "should retrieve key with null byte");
143146
assert_eq!(null_values[0], b"null_value");
144147

@@ -191,9 +194,7 @@ async fn test_list_edge_cases() -> Result<()> {
191194
let (empty_prefix, _, _) = kv::list(
192195
db,
193196
actor_id,
194-
rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery {
195-
key: vec![],
196-
}),
197+
rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { key: vec![] }),
197198
false,
198199
None,
199200
)
@@ -204,13 +205,7 @@ async fn test_list_edge_cases() -> Result<()> {
204205

205206
// Test 8: Prefix longer than any stored key
206207
tracing::info!("test 8: prefix longer than stored keys");
207-
kv::put(
208-
db,
209-
actor_id,
210-
vec![b"ab".to_vec()],
211-
vec![b"val".to_vec()],
212-
)
213-
.await?;
208+
kv::put(db, actor_id, vec![b"ab".to_vec()], vec![b"val".to_vec()]).await?;
214209

215210
let (long_prefix, _, _) = kv::list(
216211
db,
@@ -295,14 +290,26 @@ async fn test_list_edge_cases() -> Result<()> {
295290
)
296291
.await?;
297292

298-
let (zero_limit, _, _) =
299-
kv::list(db, actor_id, rp::KvListQuery::KvListAllQuery, false, Some(0)).await?;
293+
let (zero_limit, _, _) = kv::list(
294+
db,
295+
actor_id,
296+
rp::KvListQuery::KvListAllQuery,
297+
false,
298+
Some(0),
299+
)
300+
.await?;
300301
assert_eq!(zero_limit.len(), 0, "limit of 0 should return empty");
301302

302303
// Test 11: Limit of 1
303304
tracing::info!("test 11: limit of 1");
304-
let (one_limit, _, _) =
305-
kv::list(db, actor_id, rp::KvListQuery::KvListAllQuery, false, Some(1)).await?;
305+
let (one_limit, _, _) = kv::list(
306+
db,
307+
actor_id,
308+
rp::KvListQuery::KvListAllQuery,
309+
false,
310+
Some(1),
311+
)
312+
.await?;
306313
assert_eq!(one_limit.len(), 1, "limit of 1 should return 1 key");
307314

308315
// Test 12: Limit larger than total keys
@@ -328,18 +335,8 @@ async fn test_list_edge_cases() -> Result<()> {
328335
kv::put(
329336
db,
330337
actor_id,
331-
vec![
332-
b"a".to_vec(),
333-
b"b".to_vec(),
334-
b"c".to_vec(),
335-
b"d".to_vec(),
336-
],
337-
vec![
338-
b"1".to_vec(),
339-
b"2".to_vec(),
340-
b"3".to_vec(),
341-
b"4".to_vec(),
342-
],
338+
vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec(), b"d".to_vec()],
339+
vec![b"1".to_vec(), b"2".to_vec(), b"3".to_vec(), b"4".to_vec()],
343340
)
344341
.await?;
345342

@@ -359,9 +356,7 @@ async fn test_list_edge_cases() -> Result<()> {
359356
let (prefix_reverse, _, _) = kv::list(
360357
db,
361358
actor_id,
362-
rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery {
363-
key: vec![],
364-
}),
359+
rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { key: vec![] }),
365360
true,
366361
None,
367362
)

engine/packages/guard/src/errors.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,28 +38,6 @@ pub struct WrongAddrProtocol {
3838
pub received: &'static str,
3939
}
4040

41-
#[derive(RivetError, Serialize)]
42-
#[error(
43-
"guard",
44-
"actor_not_found",
45-
"Actor not found.",
46-
"Actor with ID {actor_id} not found."
47-
)]
48-
pub struct ActorNotFound {
49-
pub actor_id: Id,
50-
}
51-
52-
#[derive(RivetError, Serialize)]
53-
#[error(
54-
"guard",
55-
"actor_destroyed",
56-
"Actor destroyed.",
57-
"Actor {actor_id} was destroyed."
58-
)]
59-
pub struct ActorDestroyed {
60-
pub actor_id: Id,
61-
}
62-
6341
#[derive(RivetError, Serialize)]
6442
#[error(
6543
"guard",

engine/packages/guard/src/routing/pegboard_gateway.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,11 @@ async fn route_request_inner(
158158
.op(pegboard::ops::actor::get_for_gateway::Input { actor_id })
159159
.await?
160160
else {
161-
return Err(errors::ActorNotFound { actor_id }.build());
161+
return Err(pegboard::errors::Actor::NotFound.build());
162162
};
163163

164164
if actor.destroyed {
165-
return Err(errors::ActorDestroyed { actor_id }.build());
165+
return Err(pegboard::errors::Actor::NotFound.build());
166166
}
167167

168168
// Wake actor if sleeping

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use crate::shared_state::{InFlightRequestHandle, SharedState};
3434

3535
pub mod shared_state;
3636

37-
const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(2);
37+
const WEBSOCKET_OPEN_TIMEOUT: Duration = Duration::from_secs(15);
38+
const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(5);
3839

3940
#[derive(RivetError, Serialize, Deserialize)]
4041
#[error(
@@ -230,7 +231,7 @@ impl CustomServeTrait for PegboardGateway {
230231

231232
Err(ServiceUnavailable.build())
232233
};
233-
let response_start = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut)
234+
let response_start = tokio::time::timeout(WEBSOCKET_OPEN_TIMEOUT, fut)
234235
.await
235236
.map_err(|_| {
236237
tracing::warn!("timed out waiting for tunnel ack");
@@ -412,6 +413,11 @@ impl CustomServeTrait for PegboardGateway {
412413
client_ws.send(msg).await?;
413414
}
414415
protocol::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack) => {
416+
tracing::debug!(
417+
request_id=?Uuid::from_bytes(request_id),
418+
ack_index=?ack.index,
419+
"received WebSocketMessageAck from runner"
420+
);
415421
shared_state
416422
.ack_pending_websocket_messages(request_id, ack.index)
417423
.await?;
@@ -617,6 +623,10 @@ impl CustomServeTrait for PegboardGateway {
617623
.has_pending_websocket_messages(unique_request_id.into_bytes())
618624
.await?
619625
{
626+
tracing::debug!(
627+
?unique_request_id,
628+
"detected pending requests on websocket hibernation, rewaking actor"
629+
);
620630
return Ok(HibernationResult::Continue);
621631
}
622632

engine/packages/pegboard-gateway/src/shared_state.rs

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use crate::WebsocketPendingLimitReached;
1515

1616
const GC_INTERVAL: Duration = Duration::from_secs(15);
1717
const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30);
18-
const MAX_PENDING_MSGS_SIZE_PER_REQ: u64 = util::size::mebibytes(1);
18+
const HWS_MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30);
19+
const HWS_MAX_PENDING_MSGS_SIZE_PER_REQ: u64 = util::size::mebibytes(1);
1920

2021
pub struct InFlightRequestHandle {
2122
pub msg_rx: mpsc::Receiver<protocol::ToServerTunnelMessageKind>,
@@ -185,7 +186,7 @@ impl SharedState {
185186
if let (Some(hs), Some(ws_msg_index)) = (&mut req.hibernation_state, ws_msg_index) {
186187
hs.total_pending_ws_msgs_size += message_serialized.len() as u64;
187188

188-
if hs.total_pending_ws_msgs_size > MAX_PENDING_MSGS_SIZE_PER_REQ
189+
if hs.total_pending_ws_msgs_size > HWS_MAX_PENDING_MSGS_SIZE_PER_REQ
189190
|| hs.pending_ws_msgs.len() >= u16::MAX as usize
190191
{
191192
return Err(WebsocketPendingLimitReached {}.build());
@@ -230,40 +231,54 @@ impl SharedState {
230231
let Some(mut in_flight) =
231232
self.in_flight_requests.get_async(&msg.request_id).await
232233
else {
233-
tracing::debug!(
234+
tracing::warn!(
234235
request_id=?Uuid::from_bytes(msg.request_id),
235-
"in flight has already been disconnected"
236+
message_id=?Uuid::from_bytes(msg.message_id),
237+
"in flight has already been disconnected, cannot ack message"
236238
);
237239
continue;
238240
};
239241

240242
if let protocol::ToServerTunnelMessageKind::TunnelAck = &msg.message_kind {
241243
let prev_len = in_flight.pending_msgs.len();
242244

245+
tracing::debug!(message_id=?Uuid::from_bytes(msg.message_id), "received tunnel ack");
246+
243247
in_flight
244248
.pending_msgs
245249
.retain(|m| m.message_id != msg.message_id);
246250

247251
if prev_len == in_flight.pending_msgs.len() {
248252
tracing::warn!(
253+
request_id=?Uuid::from_bytes(msg.request_id),
254+
message_id=?Uuid::from_bytes(msg.message_id),
249255
"pending message does not exist or ack received after message body"
250256
)
257+
} else {
258+
tracing::debug!(
259+
request_id=?Uuid::from_bytes(msg.request_id),
260+
message_id=?Uuid::from_bytes(msg.message_id),
261+
"received TunnelAck, removed from pending"
262+
);
251263
}
252264
} else {
253265
// Send message to the request handler to emulate the real network action
254266
tracing::debug!(
255267
request_id=?Uuid::from_bytes(msg.request_id),
268+
message_id=?Uuid::from_bytes(msg.message_id),
256269
"forwarding message to request handler"
257270
);
258-
let _ = in_flight.msg_tx.send(msg.message_kind).await;
271+
let _ = in_flight.msg_tx.send(msg.message_kind.clone()).await;
259272

260273
// Send ack back to runner
261274
let ups_clone = self.ups.clone();
262275
let receiver_subject = in_flight.receiver_subject.clone();
276+
let request_id = msg.request_id;
277+
let message_id = msg.message_id;
263278
let ack_message = protocol::ToClient::ToClientTunnelMessage(
264279
protocol::ToClientTunnelMessage {
265-
request_id: msg.request_id,
266-
message_id: msg.message_id,
280+
request_id,
281+
message_id,
267282
gateway_reply_to: None,
268283
message_kind: protocol::ToClientTunnelMessageKind::TunnelAck,
269284
},
@@ -279,15 +294,29 @@ impl SharedState {
279294
}
280295
};
281296
tokio::spawn(async move {
282-
if let Err(err) = ups_clone
297+
match ups_clone
283298
.publish(
284299
&receiver_subject,
285300
&ack_message_serialized,
286301
PublishOpts::one(),
287302
)
288303
.await
289304
{
290-
tracing::warn!(?err, "failed to ack message")
305+
Ok(_) => {
306+
tracing::debug!(
307+
request_id=?Uuid::from_bytes(request_id),
308+
message_id=?Uuid::from_bytes(message_id),
309+
"sent TunnelAck to runner"
310+
);
311+
}
312+
Err(err) => {
313+
tracing::warn!(
314+
?err,
315+
request_id=?Uuid::from_bytes(request_id),
316+
message_id=?Uuid::from_bytes(message_id),
317+
"failed to send TunnelAck to runner"
318+
);
319+
}
291320
}
292321
});
293322
}
@@ -366,11 +395,15 @@ impl SharedState {
366395
};
367396

368397
let Some(hs) = &mut req.hibernation_state else {
369-
tracing::warn!("cannot ack ws messages, hibernation is not enabled");
398+
tracing::warn!(
399+
request_id=?Uuid::from_bytes(request_id),
400+
"cannot ack ws messages, hibernation is not enabled"
401+
);
370402
return Ok(());
371403
};
372404

373-
let len = hs.pending_ws_msgs.len().try_into()?;
405+
let len_before = hs.pending_ws_msgs.len();
406+
let len = len_before.try_into()?;
374407
let mut iter_index = 0u16;
375408
hs.pending_ws_msgs.retain(|_| {
376409
let msg_index = hs
@@ -385,6 +418,15 @@ impl SharedState {
385418
keep
386419
});
387420

421+
let len_after = hs.pending_ws_msgs.len();
422+
tracing::debug!(
423+
request_id=?Uuid::from_bytes(request_id),
424+
ack_index,
425+
removed_count=len_before - len_after,
426+
remaining_count=len_after,
427+
"acked pending websocket messages"
428+
);
429+
388430
Ok(())
389431
}
390432

@@ -425,9 +467,9 @@ impl SharedState {
425467
/// Gateway channel is closed and there are no pending messages
426468
GatewayClosed,
427469
/// Any tunnel message not acked (TunnelAck)
428-
MessageNotAcked,
470+
MessageNotAcked { message_id: Uuid },
429471
/// WebSocket pending messages (ToServerWebSocketMessageAck)
430-
WebSocketMessageNotAcked,
472+
WebSocketMessageNotAcked { last_ws_msg_index: u16 },
431473
}
432474

433475
let now = Instant::now();
@@ -460,17 +502,17 @@ impl SharedState {
460502
if now.duration_since(earliest_pending_msg.send_instant)
461503
<= MESSAGE_ACK_TIMEOUT
462504
{
463-
break 'reason Some(MsgGcReason::MessageNotAcked);
505+
break 'reason Some(MsgGcReason::MessageNotAcked{message_id:Uuid::from_bytes(earliest_pending_msg.message_id)});
464506
}
465507
}
466508

467509
if let Some(hs) = &req.hibernation_state
468510
&& let Some(earliest_pending_ws_msg) = hs.pending_ws_msgs.first()
469511
{
470512
if now.duration_since(earliest_pending_ws_msg.send_instant)
471-
<= MESSAGE_ACK_TIMEOUT
513+
<= HWS_MESSAGE_ACK_TIMEOUT
472514
{
473-
break 'reason Some(MsgGcReason::WebSocketMessageNotAcked);
515+
break 'reason Some(MsgGcReason::WebSocketMessageNotAcked{last_ws_msg_index: hs.last_ws_msg_index});
474516
}
475517
}
476518

engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ async fn handle_tunnel_message(
371371
if let Some(req) = active_requests.get(&request_id) {
372372
req.gateway_reply_to.clone()
373373
} else {
374-
tracing::warn!("no active request for tunnel message, may have timed out");
374+
tracing::warn!(request_id=?Uuid::from_bytes(msg.request_id), message_id=?Uuid::from_bytes(msg.message_id), "no active request for tunnel message, may have timed out");
375375
return Ok(());
376376
}
377377
};

0 commit comments

Comments
 (0)