Skip to content

Commit c020fe1

Browse files
committed
chore(pegboard-gateway): add new message id format & add deprecated tunnel ack
1 parent 4adef6f commit c020fe1

File tree

29 files changed

+995
-440
lines changed

29 files changed

+995
-440
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/guard-core/src/custom_serve.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use bytes::Bytes;
44
use http_body_util::Full;
55
use hyper::{Request, Response};
66
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
7-
use uuid::Uuid;
87

98
use crate::WebSocketHandle;
109
use crate::proxy_service::ResponseBody;
1110
use crate::request_context::RequestContext;
1211

12+
use pegboard::tunnel::id::RequestId;
13+
1314
pub enum HibernationResult {
1415
Continue,
1516
Close,
@@ -23,6 +24,7 @@ pub trait CustomServeTrait: Send + Sync {
2324
&self,
2425
req: Request<Full<Bytes>>,
2526
request_context: &mut RequestContext,
27+
request_id: RequestId,
2628
) -> Result<Response<ResponseBody>>;
2729

2830
/// Handle a WebSocket connection after upgrade. Supports connection retries.
@@ -33,7 +35,7 @@ pub trait CustomServeTrait: Send + Sync {
3335
_path: &str,
3436
_request_context: &mut RequestContext,
3537
// Identifies the websocket across retries.
36-
_unique_request_id: Uuid,
38+
_unique_request_id: RequestId,
3739
// True if this websocket is reconnecting after hibernation.
3840
_after_hibernation: bool,
3941
) -> Result<Option<CloseFrame>> {
@@ -44,7 +46,7 @@ pub trait CustomServeTrait: Send + Sync {
4446
async fn handle_websocket_hibernation(
4547
&self,
4648
_websocket: WebSocketHandle,
47-
_unique_request_id: Uuid,
49+
_unique_request_id: RequestId,
4850
) -> Result<HibernationResult> {
4951
bail!("service does not support websocket hibernation");
5052
}

engine/packages/guard-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod websocket_handle;
1212

1313
pub use cert_resolver::CertResolverFn;
1414
pub use custom_serve::CustomServeTrait;
15+
pub use pegboard::tunnel::id::{RequestId, generate_request_id};
1516
pub use proxy_service::{
1617
CacheKeyFn, MiddlewareFn, ProxyService, ProxyState, RouteTarget, RoutingFn, RoutingOutput,
1718
};

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ use rivet_error::{INTERNAL_ERROR, RivetError};
1313
use rivet_metrics::KeyValue;
1414
use rivet_util::Id;
1515
use serde_json;
16+
17+
use pegboard::tunnel::id::{RequestId, generate_request_id};
1618
use std::{
1719
borrow::Cow,
18-
collections::HashMap as StdHashMap,
20+
collections::{HashMap as StdHashMap, HashSet},
1921
net::SocketAddr,
2022
sync::Arc,
2123
time::{Duration, Instant},
@@ -28,7 +30,6 @@ use tokio_tungstenite::tungstenite::{
2830
};
2931
use tracing::Instrument;
3032
use url::Url;
31-
use uuid::Uuid;
3233

3334
use crate::{
3435
WebSocketHandle,
@@ -349,6 +350,7 @@ pub struct ProxyState {
349350
route_cache: RouteCache,
350351
rate_limiters: Cache<(Id, std::net::IpAddr), Arc<Mutex<RateLimiter>>>,
351352
in_flight_counters: Cache<(Id, std::net::IpAddr), Arc<Mutex<InFlightCounter>>>,
353+
inflight_requests: Arc<Mutex<HashSet<RequestId>>>,
352354
port_type: PortType,
353355
clickhouse_inserter: Option<clickhouse_inserter::ClickHouseInserterHandle>,
354356
tasks: Arc<TaskGroup>,
@@ -377,6 +379,7 @@ impl ProxyState {
377379
.max_capacity(10_000)
378380
.time_to_live(PROXY_STATE_CACHE_TTL)
379381
.build(),
382+
inflight_requests: Arc::new(Mutex::new(HashSet::new())),
380383
port_type,
381384
clickhouse_inserter,
382385
tasks: TaskGroup::new(),
@@ -649,6 +652,42 @@ impl ProxyState {
649652
counter.release();
650653
}
651654
}
655+
656+
/// Generate a unique request ID that is not currently in flight
657+
pub async fn generate_unique_request_id(&self) -> anyhow::Result<RequestId> {
658+
const MAX_TRIES: u32 = 100;
659+
let mut requests = self.inflight_requests.lock().await;
660+
661+
for attempt in 0..MAX_TRIES {
662+
let request_id = generate_request_id();
663+
664+
// Check if this ID is already in use
665+
if !requests.contains(&request_id) {
666+
// Insert the ID and return it
667+
requests.insert(request_id);
668+
return Ok(request_id);
669+
}
670+
671+
// Collision occurred (extremely rare with 4 bytes = 4 billion possibilities)
672+
// Generate a new ID and try again
673+
tracing::warn!(
674+
?request_id,
675+
attempt,
676+
"request id collision, generating new id"
677+
);
678+
}
679+
680+
anyhow::bail!(
681+
"failed to generate unique request id after {} attempts",
682+
MAX_TRIES
683+
);
684+
}
685+
686+
/// Release a request ID when the request is complete
687+
pub async fn release_request_id(&self, request_id: RequestId) {
688+
let mut requests = self.inflight_requests.lock().await;
689+
requests.remove(&request_id);
690+
}
652691
}
653692

654693
// Helper function to choose a random target from a list of targets
@@ -1044,6 +1083,9 @@ impl ProxyService {
10441083
ResolveRouteOutput::CustomServe(mut handler) => {
10451084
let req_headers = req.headers().clone();
10461085

1086+
// Generate unique request ID
1087+
let request_id = self.state.generate_unique_request_id().await?;
1088+
10471089
// Collect request body
10481090
let (req_parts, body) = req.into_parts();
10491091
let collected_body = match http_body_util::BodyExt::collect(body).await {
@@ -1064,7 +1106,7 @@ impl ProxyService {
10641106
attempts += 1;
10651107

10661108
let res = handler
1067-
.handle_request(req_collected.clone(), request_context)
1109+
.handle_request(req_collected.clone(), request_context, request_id)
10681110
.await;
10691111
if should_retry_request(&res) {
10701112
// Request connect error, might retry
@@ -1838,12 +1880,13 @@ impl ProxyService {
18381880
tracing::debug!(%req_path, "Spawning task to handle WebSocket communication");
18391881
let mut request_context = request_context.clone();
18401882
let req_headers = req_headers.clone();
1883+
let state = self.state.clone();
18411884
let req_path = req_path.clone();
18421885
let req_host = req_host.clone();
18431886

18441887
self.state.tasks.spawn(
18451888
async move {
1846-
let request_id = Uuid::new_v4();
1889+
let request_id = state.generate_unique_request_id().await?;
18471890
let mut ws_hibernation_close = false;
18481891
let mut after_hibernation = false;
18491892
let mut attempts = 0u32;

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,30 @@ use futures_util::TryStreamExt;
55
use gas::prelude::*;
66
use http_body_util::{BodyExt, Full};
77
use hyper::{Request, Response, StatusCode};
8+
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
89
use rand::Rng;
910
use rivet_error::*;
1011
use rivet_guard_core::{
11-
WebSocketHandle,
1212
custom_serve::{CustomServeTrait, HibernationResult},
1313
errors::{
1414
ServiceUnavailable, WebSocketServiceHibernate, WebSocketServiceTimeout,
1515
WebSocketServiceUnavailable,
1616
},
17-
proxy_service::{ResponseBody, is_ws_hibernate},
17+
proxy_service::{is_ws_hibernate, ResponseBody},
1818
request_context::RequestContext,
1919
websocket_handle::WebSocketReceiver,
20+
WebSocketHandle,
2021
};
2122
use rivet_runner_protocol as protocol;
2223
use rivet_util::serde::HashableMap;
2324
use std::{sync::Arc, time::Duration};
2425
use tokio::{
25-
sync::{Mutex, watch},
26+
sync::{watch, Mutex},
2627
task::JoinHandle,
2728
};
2829
use tokio_tungstenite::tungstenite::{
30+
protocol::frame::{coding::CloseCode, CloseFrame},
2931
Message,
30-
protocol::frame::{CloseFrame, coding::CloseCode},
3132
};
3233

3334
use crate::shared_state::{InFlightRequestHandle, SharedState};
@@ -86,6 +87,7 @@ impl CustomServeTrait for PegboardGateway {
8687
&self,
8788
req: Request<Full<Bytes>>,
8889
_request_context: &mut RequestContext,
90+
request_id: RequestId,
8991
) -> Result<Response<ResponseBody>> {
9092
// Use the actor ID from the gateway instance
9193
let actor_id = self.actor_id.to_string();
@@ -163,7 +165,6 @@ impl CustomServeTrait for PegboardGateway {
163165
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
164166

165167
// Start listening for request responses
166-
let request_id = Uuid::new_v4().into_bytes();
167168
let InFlightRequestHandle {
168169
mut msg_rx,
169170
mut drop_rx,
@@ -212,7 +213,7 @@ impl CustomServeTrait for PegboardGateway {
212213
}
213214
} else {
214215
tracing::warn!(
215-
request_id=?Uuid::from_bytes(request_id),
216+
request_id=?tunnel_id::request_id_to_string(&request_id),
216217
"received no message response during request init",
217218
);
218219
break;
@@ -274,7 +275,7 @@ impl CustomServeTrait for PegboardGateway {
274275
headers: &hyper::HeaderMap,
275276
_path: &str,
276277
_request_context: &mut RequestContext,
277-
unique_request_id: Uuid,
278+
unique_request_id: RequestId,
278279
after_hibernation: bool,
279280
) -> Result<Option<CloseFrame>> {
280281
// Use the actor ID from the gateway instance
@@ -298,7 +299,7 @@ impl CustomServeTrait for PegboardGateway {
298299
pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string();
299300

300301
// Start listening for WebSocket messages
301-
let request_id = unique_request_id.into_bytes();
302+
let request_id = unique_request_id;
302303
let InFlightRequestHandle {
303304
mut msg_rx,
304305
mut drop_rx,
@@ -348,7 +349,7 @@ impl CustomServeTrait for PegboardGateway {
348349
}
349350
} else {
350351
tracing::warn!(
351-
request_id=?Uuid::from_bytes(request_id),
352+
request_id=?tunnel_id::request_id_to_string(&request_id),
352353
"received no message response during ws init",
353354
);
354355
break;
@@ -414,7 +415,7 @@ impl CustomServeTrait for PegboardGateway {
414415
}
415416
protocol::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack) => {
416417
tracing::debug!(
417-
request_id=?Uuid::from_bytes(request_id),
418+
request_id=?tunnel_id::request_id_to_string(&request_id),
418419
ack_index=?ack.index,
419420
"received WebSocketMessageAck from runner"
420421
);
@@ -477,8 +478,6 @@ impl CustomServeTrait for PegboardGateway {
477478
let ws_message =
478479
protocol::ToClientTunnelMessageKind::ToClientWebSocketMessage(
479480
protocol::ToClientWebSocketMessage {
480-
// NOTE: This gets set in shared_state.ts
481-
index: 0,
482481
data: data.into(),
483482
binary: true,
484483
},
@@ -491,8 +490,6 @@ impl CustomServeTrait for PegboardGateway {
491490
let ws_message =
492491
protocol::ToClientTunnelMessageKind::ToClientWebSocketMessage(
493492
protocol::ToClientWebSocketMessage {
494-
// NOTE: This gets set in shared_state.ts
495-
index: 0,
496493
data: text.as_bytes().to_vec(),
497494
binary: false,
498495
},
@@ -615,12 +612,14 @@ impl CustomServeTrait for PegboardGateway {
615612
async fn handle_websocket_hibernation(
616613
&self,
617614
client_ws: WebSocketHandle,
618-
unique_request_id: Uuid,
615+
unique_request_id: RequestId,
619616
) -> Result<HibernationResult> {
617+
let request_id = unique_request_id;
618+
620619
// Immediately rewake if we have pending messages
621620
if self
622621
.shared_state
623-
.has_pending_websocket_messages(unique_request_id.into_bytes())
622+
.has_pending_websocket_messages(request_id)
624623
.await?
625624
{
626625
tracing::debug!(
@@ -633,6 +632,8 @@ impl CustomServeTrait for PegboardGateway {
633632
// Start keepalive task
634633
let ctx = self.ctx.clone();
635634
let actor_id = self.actor_id;
635+
let gateway_id = self.shared_state.gateway_id();
636+
let request_id = unique_request_id;
636637
let keepalive_handle: JoinHandle<Result<()>> = tokio::spawn(async move {
637638
let mut ping_interval = tokio::time::interval(Duration::from_millis(
638639
(ctx.config()
@@ -652,7 +653,8 @@ impl CustomServeTrait for PegboardGateway {
652653

653654
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
654655
actor_id,
655-
request_id: unique_request_id,
656+
gateway_id,
657+
request_id,
656658
})
657659
.await?;
658660
}
@@ -669,6 +671,7 @@ impl CustomServeTrait for PegboardGateway {
669671
self.ctx
670672
.op(pegboard::ops::actor::hibernating_request::delete::Input {
671673
actor_id: self.actor_id,
674+
gateway_id: self.shared_state.gateway_id(),
672675
request_id: unique_request_id,
673676
})
674677
.await?;

0 commit comments

Comments
 (0)