Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions engine/packages/pegboard-gateway/src/keepalive_task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use gas::prelude::*;
use pegboard::tunnel::id as tunnel_id;
use pegboard::tunnel::id::{GatewayId, RequestId};
use rand::Rng;
use std::time::Duration;
Expand All @@ -12,7 +13,6 @@ use crate::shared_state::SharedState;
/// next actor start.
///
/// Only ran for hibernating requests.

pub async fn task(
shared_state: SharedState,
ctx: StandaloneCtx,
Expand All @@ -30,10 +30,6 @@ pub async fn task(
));
ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// Discard the first tick since it fires immediately and we've already called this
// above
ping_interval.tick().await;

loop {
tokio::select! {
_ = ping_interval.tick() => {}
Expand All @@ -46,6 +42,13 @@ pub async fn task(
let jitter = { rand::thread_rng().gen_range(0..128) };
tokio::time::sleep(Duration::from_millis(jitter)).await;

tracing::debug!(
%actor_id,
gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id),
request_id=%tunnel_id::request_id_to_string(&request_id),
"updating hws keepalive"
);

tokio::try_join!(
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id,
Expand Down
18 changes: 4 additions & 14 deletions engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ use hyper::{Request, Response, StatusCode};
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
use rivet_error::*;
use rivet_guard_core::{
WebSocketHandle,
custom_serve::{CustomServeTrait, HibernationResult},
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
proxy_service::{ResponseBody, is_ws_hibernate},
proxy_service::{is_ws_hibernate, ResponseBody},
request_context::RequestContext,
websocket_handle::WebSocketReceiver,
WebSocketHandle,
};
use rivet_runner_protocol as protocol;
use rivet_util::serde::HashableMap;
use std::{sync::Arc, time::Duration};
use tokio::sync::{Mutex, watch};
use tokio::sync::{watch, Mutex};
use tokio_tungstenite::tungstenite::{
protocol::frame::{coding::CloseCode, CloseFrame},
Message,
protocol::frame::{CloseFrame, coding::CloseCode},
};

use crate::shared_state::{InFlightRequestHandle, SharedState};
Expand Down Expand Up @@ -578,16 +578,6 @@ impl CustomServeTrait for PegboardGateway {
client_ws: WebSocketHandle,
request_id: RequestId,
) -> Result<HibernationResult> {
// Insert hibernating request entry before checking for pending messages
// This ensures the entry exists even if we immediately rewake the actor
self.ctx
.op(pegboard::ops::actor::hibernating_request::upsert::Input {
actor_id: self.actor_id,
gateway_id: self.shared_state.gateway_id(),
request_id,
})
.await?;

// Immediately rewake if we have pending messages
if self
.shared_state
Expand Down
14 changes: 10 additions & 4 deletions engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use anyhow::Result;
use gas::prelude::*;
use pegboard::tunnel::id::{self as tunnel_id, GatewayId, RequestId};
use rivet_guard_core::errors::WebSocketServiceTimeout;
use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
use scc::{HashMap, hash_map::Entry};
use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION};
use scc::{hash_map::Entry, HashMap};
use std::{
ops::Deref,
sync::Arc,
Expand All @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, watch};
use universalpubsub::{NextOutput, PubSub, PublishOpts, Subscriber};
use vbare::OwnedVersionedData;

use crate::{WebsocketPendingLimitReached, metrics};
use crate::{metrics, WebsocketPendingLimitReached};

const GC_INTERVAL: Duration = Duration::from_secs(15);
const TUNNEL_PING_TIMEOUT: i64 = util::duration::seconds(30);
Expand Down Expand Up @@ -512,7 +512,13 @@ impl SharedState {
}
}

if hs.last_ping.elapsed() > hibernation_timeout {
let hs_elapsed = hs.last_ping.elapsed();
tracing::debug!(
hs_elapsed=%hs_elapsed.as_secs_f64(),
timeout=%hibernation_timeout.as_secs_f64(),
"checking hibernating state elapsed time"
);
if hs_elapsed> hibernation_timeout {
break 'reason Some(MsgGcReason::HibernationTimeout);
}
} else if req.msg_tx.is_closed() {
Expand Down
4 changes: 1 addition & 3 deletions engine/sdks/schemas/runner-protocol/v3.bare

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 3 additions & 33 deletions engine/sdks/typescript/runner-protocol/src/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions engine/sdks/typescript/runner/src/actor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type * as protocol from "@rivetkit/engine-runner-protocol";
import { logger } from "./log";
import type { PendingRequest } from "./tunnel";
import { arraysEqual, idToStr, promiseWithResolvers } from "./utils";
import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter";
import { arraysEqual, promiseWithResolvers } from "./utils";
import { logger } from "./log";
import * as tunnelId from "./tunnel-id";

export interface ActorConfig {
name: string;
Expand Down Expand Up @@ -72,8 +71,8 @@ export class RunnerActor {
if (exists) {
logger()?.warn({
msg: "attempting to set pending request twice, replacing existing",
gatewayId: tunnelId.gatewayIdToString(gatewayId),
requestId: tunnelId.requestIdToString(requestId),
gatewayId: idToStr(gatewayId),
requestId: idToStr(requestId),
});
// Delete existing pending request before adding the new one
this.deletePendingRequest(gatewayId, requestId);
Expand All @@ -92,8 +91,8 @@ export class RunnerActor {
});
logger()?.debug({
msg: "added pending request",
gatewayId: tunnelId.gatewayIdToString(gatewayId),
requestId: tunnelId.requestIdToString(requestId),
gatewayId: idToStr(gatewayId),
requestId: idToStr(requestId),
length: this.pendingRequests.length,
});
}
Expand All @@ -109,8 +108,8 @@ export class RunnerActor {
if (exists) {
logger()?.warn({
msg: "attempting to set pending request twice, replacing existing",
gatewayId: tunnelId.gatewayIdToString(gatewayId),
requestId: tunnelId.requestIdToString(requestId),
gatewayId: idToStr(gatewayId),
requestId: idToStr(requestId),
});
// Delete existing pending request before adding the new one
this.deletePendingRequest(gatewayId, requestId);
Expand All @@ -130,8 +129,8 @@ export class RunnerActor {
});
logger()?.debug({
msg: "added pending request with stream controller",
gatewayId: tunnelId.gatewayIdToString(gatewayId),
requestId: tunnelId.requestIdToString(requestId),
gatewayId: idToStr(gatewayId),
requestId: idToStr(requestId),
length: this.pendingRequests.length,
});
}
Expand All @@ -149,8 +148,8 @@ export class RunnerActor {
this.pendingRequests.splice(index, 1);
logger()?.debug({
msg: "removed pending request",
gatewayId: tunnelId.gatewayIdToString(gatewayId),
requestId: tunnelId.requestIdToString(requestId),
gatewayId: idToStr(gatewayId),
requestId: idToStr(requestId),
length: this.pendingRequests.length,
});
}
Expand Down
2 changes: 1 addition & 1 deletion engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import { importWebSocket } from "./websocket.js";

export type { HibernatingWebSocketMetadata };
export * as tunnelId from "./tunnel-id";
export { RunnerActor, type ActorConfig };
export { idToStr } from "./utils";

const KV_EXPIRE: number = 30_000;
const PROTOCOL_VERSION: number = 3;
Expand Down Expand Up @@ -199,7 +199,7 @@
runnerId?: string;
#lastCommandIdx: number = -1;
#pingLoop?: NodeJS.Timeout;
#nextEventIdx: bigint = 0n;

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.
#started: boolean = false;
#shutdown: boolean = false;
#shuttingDown: boolean = false;
Expand Down
34 changes: 24 additions & 10 deletions engine/sdks/typescript/runner/src/stringify.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type * as protocol from "@rivetkit/engine-runner-protocol";
import { idToStr } from "./utils";

/**
* Helper function to stringify ArrayBuffer for logging
Expand All @@ -24,6 +25,13 @@ function stringifyMap(map: ReadonlyMap<string, string>): string {
return `Map(${map.size}){${entries}}`;
}

/**
* Helper function to stringify MessageId for logging
*/
function stringifyMessageId(messageId: protocol.MessageId): string {
return `MessageId{gatewayId: ${idToStr(messageId.gatewayId)}, requestId: ${idToStr(messageId.requestId)}, messageIndex: ${messageId.messageIndex}}`;
}

/**
* Stringify ToServerTunnelMessageKind for logging
* Handles ArrayBuffers, BigInts, and Maps that can't be JSON.stringified
Expand Down Expand Up @@ -111,15 +119,17 @@ export function stringifyToClientTunnelMessageKind(
export function stringifyCommand(command: protocol.Command): string {
switch (command.tag) {
case "CommandStartActor": {
const { actorId, generation, config, hibernatingRequests } = command.val;
const { actorId, generation, config, hibernatingRequests } =
command.val;
const keyStr = config.key === null ? "null" : `"${config.key}"`;
const inputStr =
config.input === null
? "null"
: stringifyArrayBuffer(config.input);
const hibernatingRequestsStr = hibernatingRequests.length > 0
? `[${hibernatingRequests.map((hr) => `{gatewayId: ${stringifyArrayBuffer(hr.gatewayId)}, requestId: ${stringifyArrayBuffer(hr.requestId)}}`).join(", ")}]`
: "[]";
const hibernatingRequestsStr =
hibernatingRequests.length > 0
? `[${hibernatingRequests.map((hr) => `{gatewayId: ${idToStr(hr.gatewayId)}, requestId: ${idToStr(hr.requestId)}}`).join(", ")}]`
: "[]";
return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}, hibernatingRequests: ${hibernatingRequestsStr}}`;
}
case "CommandStopActor": {
Expand Down Expand Up @@ -193,8 +203,14 @@ export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string {
export function stringifyToServer(message: protocol.ToServer): string {
switch (message.tag) {
case "ToServerInit": {
const { name, version, totalSlots, lastCommandIdx, prepopulateActorNames, metadata } =
message.val;
const {
name,
version,
totalSlots,
lastCommandIdx,
prepopulateActorNames,
metadata,
} = message.val;
const lastCommandIdxStr =
lastCommandIdx === null
? "null"
Expand Down Expand Up @@ -227,8 +243,7 @@ export function stringifyToServer(message: protocol.ToServer): string {
}
case "ToServerTunnelMessage": {
const { messageId, messageKind } = message.val;
const messageIdStr = stringifyArrayBuffer(messageId);
return `ToServerTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`;
return `ToServerTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToServerTunnelMessageKind(messageKind)}}`;
}
}
}
Expand Down Expand Up @@ -261,8 +276,7 @@ export function stringifyToClient(message: protocol.ToClient): string {
}
case "ToClientTunnelMessage": {
const { messageId, messageKind } = message.val;
const messageIdStr = stringifyArrayBuffer(messageId);
return `ToClientTunnelMessage{messageId: ${messageIdStr}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`;
return `ToClientTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToClientTunnelMessageKind(messageKind)}}`;
}
}
}
Expand Down
Loading
Loading