Skip to content

Commit 0f3c329

Browse files
committed
chore: remove msg id parts from protocol, move id utils to runner protocol crate (#3508)
1 parent 89218a8 commit 0f3c329

File tree

28 files changed

+175
-240
lines changed

28 files changed

+175
-240
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/guard-core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ hyper-util = { workspace = true, features = ["full"] }
2525
indoc.workspace = true
2626
lazy_static.workspace = true
2727
moka = { workspace = true, features = ["future"] }
28-
pegboard.workspace = true
2928
rand.workspace = true
3029
regex.workspace = true
3130
rivet-api-builder.workspace = true
3231
rivet-config.workspace = true
3332
rivet-error.workspace = true
3433
rivet-metrics.workspace = true
34+
rivet-runner-protocol.workspace = true
3535
rivet-runtime.workspace = true
3636
rivet-util.workspace = true
37-
rustls.workspace = true
3837
rustls-pemfile.workspace = true
38+
rustls.workspace = true
3939
serde_json.workspace = true
4040
serde.workspace = true
4141
tokio-rustls.workspace = true

engine/packages/guard-core/src/custom_serve.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use async_trait::async_trait;
33
use bytes::Bytes;
44
use http_body_util::Full;
55
use hyper::{Request, Response};
6-
use pegboard::tunnel::id::RequestId;
6+
use rivet_runner_protocol as protocol;
77
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
88

99
use crate::WebSocketHandle;
@@ -23,7 +23,7 @@ pub trait CustomServeTrait: Send + Sync {
2323
&self,
2424
req: Request<Full<Bytes>>,
2525
request_context: &mut RequestContext,
26-
request_id: RequestId,
26+
request_id: protocol::RequestId,
2727
) -> Result<Response<ResponseBody>>;
2828

2929
/// Handle a WebSocket connection after upgrade. Supports connection retries.
@@ -34,7 +34,7 @@ pub trait CustomServeTrait: Send + Sync {
3434
_path: &str,
3535
_request_context: &mut RequestContext,
3636
// Identifies the websocket across retries.
37-
_unique_request_id: RequestId,
37+
_unique_request_id: protocol::RequestId,
3838
// True if this websocket is reconnecting after hibernation.
3939
_after_hibernation: bool,
4040
) -> Result<Option<CloseFrame>> {
@@ -45,7 +45,7 @@ pub trait CustomServeTrait: Send + Sync {
4545
async fn handle_websocket_hibernation(
4646
&self,
4747
_websocket: WebSocketHandle,
48-
_unique_request_id: RequestId,
48+
_unique_request_id: protocol::RequestId,
4949
) -> Result<HibernationResult> {
5050
bail!("service does not support websocket hibernation");
5151
}

engine/packages/guard-core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ pub mod websocket_handle;
1212

1313
pub use cert_resolver::CertResolverFn;
1414
pub use custom_serve::CustomServeTrait;
15-
pub use pegboard::tunnel::id::{RequestId, generate_request_id};
1615
pub use proxy_service::{
1716
CacheKeyFn, MiddlewareFn, ProxyService, ProxyState, RouteTarget, RoutingFn, RoutingOutput,
1817
};

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use rivet_metrics::KeyValue;
1414
use rivet_util::Id;
1515
use serde_json;
1616

17-
use pegboard::tunnel::id::{RequestId, generate_request_id};
17+
use rivet_runner_protocol as protocol;
1818
use std::{
1919
borrow::Cow,
2020
collections::{HashMap as StdHashMap, HashSet},
@@ -350,7 +350,7 @@ pub struct ProxyState {
350350
route_cache: RouteCache,
351351
rate_limiters: Cache<(Id, std::net::IpAddr), Arc<Mutex<RateLimiter>>>,
352352
in_flight_counters: Cache<(Id, std::net::IpAddr), Arc<Mutex<InFlightCounter>>>,
353-
inflight_requests: Arc<Mutex<HashSet<RequestId>>>,
353+
in_flight_requests: Arc<Mutex<HashSet<protocol::RequestId>>>,
354354
port_type: PortType,
355355
clickhouse_inserter: Option<clickhouse_inserter::ClickHouseInserterHandle>,
356356
tasks: Arc<TaskGroup>,
@@ -379,7 +379,7 @@ impl ProxyState {
379379
.max_capacity(10_000)
380380
.time_to_live(PROXY_STATE_CACHE_TTL)
381381
.build(),
382-
inflight_requests: Arc::new(Mutex::new(HashSet::new())),
382+
in_flight_requests: Arc::new(Mutex::new(HashSet::new())),
383383
port_type,
384384
clickhouse_inserter,
385385
tasks: TaskGroup::new(),
@@ -603,7 +603,7 @@ impl ProxyState {
603603
ip_addr: std::net::IpAddr,
604604
actor_id: &Option<Id>,
605605
headers: &hyper::HeaderMap,
606-
) -> Result<Option<RequestId>> {
606+
) -> Result<Option<protocol::RequestId>> {
607607
// Check in-flight limit if actor_id is present
608608
if let Some(actor_id) = *actor_id {
609609
// Get actor-specific middleware config
@@ -648,7 +648,7 @@ impl ProxyState {
648648
&self,
649649
ip_addr: std::net::IpAddr,
650650
actor_id: &Option<Id>,
651-
request_id: RequestId,
651+
request_id: protocol::RequestId,
652652
) {
653653
// Release in-flight counter if actor_id is present
654654
if let Some(actor_id) = *actor_id {
@@ -660,17 +660,17 @@ impl ProxyState {
660660
}
661661

662662
// Release request ID
663-
let mut requests = self.inflight_requests.lock().await;
663+
let mut requests = self.in_flight_requests.lock().await;
664664
requests.remove(&request_id);
665665
}
666666

667667
/// Generate a unique request ID that is not currently in flight
668-
async fn generate_unique_request_id(&self) -> anyhow::Result<RequestId> {
668+
async fn generate_unique_request_id(&self) -> Result<protocol::RequestId> {
669669
const MAX_TRIES: u32 = 100;
670-
let mut requests = self.inflight_requests.lock().await;
670+
let mut requests = self.in_flight_requests.lock().await;
671671

672672
for attempt in 0..MAX_TRIES {
673-
let request_id = generate_request_id();
673+
let request_id = protocol::util::generate_request_id();
674674

675675
// Check if this ID is already in use
676676
if !requests.contains(&request_id) {
@@ -688,7 +688,7 @@ impl ProxyState {
688688
);
689689
}
690690

691-
anyhow::bail!(
691+
bail!(
692692
"failed to generate unique request id after {} attempts",
693693
MAX_TRIES
694694
);
@@ -2144,7 +2144,7 @@ impl ProxyService {
21442144
.release_in_flight(client_ip, &actor_id, request_id)
21452145
.await;
21462146

2147-
anyhow::Ok(())
2147+
Ok(())
21482148
}
21492149
.instrument(tracing::info_span!("handle_ws_task_custom_serve")),
21502150
);

engine/packages/guard/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ rivet-guard-core.workspace = true
3737
rivet-logs.workspace = true
3838
rivet-metrics.workspace = true
3939
rivet-pools.workspace = true
40+
rivet-runner-protocol.workspace = true
4041
rivet-runtime.workspace = true
4142
rustls-pemfile.workspace = true
4243
rustls.workspace = true

engine/packages/guard/src/routing/api_public.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use bytes::Bytes;
66
use gas::prelude::*;
77
use http_body_util::{BodyExt, Full};
88
use hyper::{Request, Response};
9-
use pegboard::tunnel::id::RequestId;
109
use rivet_guard_core::proxy_service::{ResponseBody, RoutingOutput};
1110
use rivet_guard_core::{CustomServeTrait, request_context::RequestContext};
11+
use rivet_runner_protocol as protocol;
1212
use tower::Service;
1313

1414
struct ApiPublicService {
@@ -21,7 +21,7 @@ impl CustomServeTrait for ApiPublicService {
2121
&self,
2222
req: Request<Full<Bytes>>,
2323
_request_context: &mut RequestContext,
24-
_request_id: RequestId,
24+
_request_id: protocol::RequestId,
2525
) -> Result<Response<ResponseBody>> {
2626
// Clone the router to get a mutable service
2727
let mut service = self.router.clone();

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use anyhow::Result;
22
use gas::prelude::*;
3-
use pegboard::tunnel::id as tunnel_id;
4-
use pegboard::tunnel::id::{GatewayId, RequestId};
53
use rand::Rng;
4+
use rivet_runner_protocol as protocol;
65
use std::time::Duration;
76
use tokio::sync::watch;
87

@@ -17,8 +16,8 @@ pub async fn task(
1716
shared_state: SharedState,
1817
ctx: StandaloneCtx,
1918
actor_id: Id,
20-
gateway_id: GatewayId,
21-
request_id: RequestId,
19+
gateway_id: protocol::GatewayId,
20+
request_id: protocol::RequestId,
2221
mut keepalive_abort_rx: watch::Receiver<()>,
2322
) -> Result<LifecycleResult> {
2423
let mut ping_interval = tokio::time::interval(Duration::from_millis(
@@ -44,8 +43,8 @@ pub async fn task(
4443

4544
tracing::debug!(
4645
%actor_id,
47-
gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id),
48-
request_id=%tunnel_id::request_id_to_string(&request_id),
46+
gateway_id=%protocol::util::id_to_string(&gateway_id),
47+
request_id=%protocol::util::id_to_string(&request_id),
4948
"updating hws keepalive"
5049
);
5150

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use futures_util::TryStreamExt;
55
use gas::prelude::*;
66
use http_body_util::{BodyExt, Full};
77
use hyper::{Request, Response, StatusCode};
8-
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
98
use rivet_error::*;
109
use rivet_guard_core::{
1110
custom_serve::{CustomServeTrait, HibernationResult},
@@ -86,7 +85,7 @@ impl CustomServeTrait for PegboardGateway {
8685
&self,
8786
req: Request<Full<Bytes>>,
8887
_request_context: &mut RequestContext,
89-
request_id: RequestId,
88+
request_id: protocol::RequestId,
9089
) -> Result<Response<ResponseBody>> {
9190
// Use the actor ID from the gateway instance
9291
let actor_id = self.actor_id.to_string();
@@ -213,7 +212,7 @@ impl CustomServeTrait for PegboardGateway {
213212
}
214213
} else {
215214
tracing::warn!(
216-
request_id=%tunnel_id::request_id_to_string(&request_id),
215+
request_id=%protocol::util::id_to_string(&request_id),
217216
"received no message response during request init",
218217
);
219218
break;
@@ -268,14 +267,14 @@ impl CustomServeTrait for PegboardGateway {
268267
Ok(response)
269268
}
270269

271-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
270+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%protocol::util::id_to_string(&request_id)))]
272271
async fn handle_websocket(
273272
&self,
274273
client_ws: WebSocketHandle,
275274
headers: &hyper::HeaderMap,
276275
_path: &str,
277276
_request_context: &mut RequestContext,
278-
request_id: RequestId,
277+
request_id: protocol::RequestId,
279278
after_hibernation: bool,
280279
) -> Result<Option<CloseFrame>> {
281280
// Use the actor ID from the gateway instance
@@ -354,7 +353,7 @@ impl CustomServeTrait for PegboardGateway {
354353
}
355354
} else {
356355
tracing::warn!(
357-
request_id=%tunnel_id::request_id_to_string(&request_id),
356+
request_id=%protocol::util::id_to_string(&request_id),
358357
"received no message response during ws init",
359358
);
360359
break;
@@ -572,11 +571,11 @@ impl CustomServeTrait for PegboardGateway {
572571
}
573572
}
574573

575-
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
574+
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%protocol::util::id_to_string(&request_id)))]
576575
async fn handle_websocket_hibernation(
577576
&self,
578577
client_ws: WebSocketHandle,
579-
request_id: RequestId,
578+
request_id: protocol::RequestId,
580579
) -> Result<HibernationResult> {
581580
// Immediately rewake if we have pending messages
582581
if self

0 commit comments

Comments
 (0)