Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/guard-core/src/custom_serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Request, Response};
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use pegboard::tunnel::id::RequestId;
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;

use crate::WebSocketHandle;
use crate::proxy_service::ResponseBody;
Expand Down
29 changes: 15 additions & 14 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,19 +612,20 @@ impl ProxyState {
let cache_key = (actor_id, ip_addr);

// Get existing counter or create a new one
let counter_arc =
if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await {
existing_counter
} else {
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
middleware_config.max_in_flight.amount,
)));
self.in_flight_counters
.insert(cache_key, new_counter.clone())
.await;
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
new_counter
};
let counter_arc = if let Some(existing_counter) =
self.in_flight_counters.get(&cache_key).await
{
existing_counter
} else {
let new_counter = Arc::new(Mutex::new(InFlightCounter::new(
middleware_config.max_in_flight.amount,
)));
self.in_flight_counters
.insert(cache_key, new_counter.clone())
.await;
metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]);
new_counter
};

// Try to acquire from the counter
let acquired = {
Expand All @@ -638,7 +639,7 @@ impl ProxyState {
}

// Generate unique request ID
let request_id = Some(self.generate_unique_request_id().await?);
let request_id = Some(self.generate_unique_request_id().await?);
Ok(request_id)
}

Expand Down
127 changes: 127 additions & 0 deletions engine/packages/pegboard-runner/src/actor_event_demuxer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::collections::HashMap;
use std::time::{Duration, Instant};

use anyhow::Result;
use gas::prelude::*;
use rivet_runner_protocol as protocol;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

const GC_INTERVAL: Duration = Duration::from_secs(30);
const MAX_LAST_SEEN: Duration = Duration::from_secs(30);

struct Channel {
tx: mpsc::UnboundedSender<protocol::Event>,
handle: JoinHandle<()>,
last_seen: Instant,
}

pub struct ActorEventDemuxer {
ctx: StandaloneCtx,
channels: HashMap<Id, Channel>,
last_gc: Instant,
}

impl ActorEventDemuxer {
pub fn new(ctx: StandaloneCtx) -> Self {
Self {
ctx,
channels: HashMap::new(),
last_gc: Instant::now(),
}
}

/// Process an event by routing it to the appropriate actor's queue
#[tracing::instrument(skip_all)]
pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) {
if let Some(channel) = self.channels.get(&actor_id) {
let _ = channel.tx.send(event);
} else {
let (tx, mut rx) = mpsc::unbounded_channel();

let ctx = self.ctx.clone();
let handle = tokio::spawn(async move {
loop {
let mut buffer = Vec::new();

// Batch process events
if rx.recv_many(&mut buffer, 1024).await == 0 {
break;
}

if let Err(err) = dispatch_events(&ctx, actor_id, buffer).await {
tracing::error!(?err, "actor event processor failed");
break;
}
}
});

self.channels.insert(
actor_id,
Channel {
tx,
handle,
last_seen: Instant::now(),
},
);
}

// Run gc periodically
if self.last_gc.elapsed() > GC_INTERVAL {
self.last_gc = Instant::now();

self.channels.retain(|_, channel| {
let keep = channel.last_seen.elapsed() < MAX_LAST_SEEN;

if !keep {
// TODO: Verify aborting is safe here
channel.handle.abort();
}

keep
});
}
}

/// Shutdown all tasks and wait for them to complete
#[tracing::instrument(skip_all)]
pub async fn shutdown(self) {
tracing::debug!(channels=?self.channels.len(), "shutting down actor demuxer");

// Drop all senders
let handles = self
.channels
.into_iter()
.map(|(_, channel)| channel.handle)
.collect::<Vec<_>>();

// Await remaining tasks
for handle in handles {
let _ = handle.await;
}

tracing::debug!("actor demuxer shut down");
}
}

async fn dispatch_events(
ctx: &StandaloneCtx,
actor_id: Id,
events: Vec<protocol::Event>,
) -> Result<()> {
let res = ctx
.signal(pegboard::workflows::actor::Events { inner: events })
.tag("actor_id", actor_id)
.graceful_not_found()
.send()
.await
.with_context(|| format!("failed to forward signal to actor workflow: {}", actor_id))?;
if res.is_none() {
tracing::warn!(
?actor_id,
"failed to send signal to actor workflow, likely already stopped"
);
}

Ok(())
}
59 changes: 42 additions & 17 deletions engine/packages/pegboard-runner/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn init_conn(
}
};

let packet = versioned::ToServer::deserialize(&buf, protocol_version)
let init_packet = versioned::ToServer::deserialize(&buf, protocol_version)
.map_err(|err| WsError::InvalidPacket(err.to_string()).build())
.context("failed to deserialize initial packet from client")?;

Expand All @@ -74,7 +74,7 @@ pub async fn init_conn(
version,
total_slots,
..
}) = &packet
}) = &init_packet
{
// Look up existing runner by key
let existing_runner = ctx
Expand Down Expand Up @@ -127,14 +127,15 @@ pub async fn init_conn(
};

// Spawn a new runner workflow if one doesn't already exist
let workflow_id = ctx
.workflow(pegboard::workflows::runner2::Input {
let workflow_id = if protocol::is_new(protocol_version) {
ctx.workflow(pegboard::workflows::runner2::Input {
runner_id,
namespace_id: namespace.namespace_id,
name: name.clone(),
key: runner_key.clone(),
version: version.clone(),
total_slots: *total_slots,
protocol_version,
})
.tag("runner_id", runner_id)
.unique()
Expand All @@ -145,25 +146,49 @@ pub async fn init_conn(
"failed to dispatch runner workflow for runner: {}",
runner_id
)
})?;
})?
} else {
ctx.workflow(pegboard::workflows::runner::Input {
runner_id,
namespace_id: namespace.namespace_id,
name: name.clone(),
key: runner_key.clone(),
version: version.clone(),
total_slots: *total_slots,
})
.tag("runner_id", runner_id)
.unique()
.dispatch()
.await
.with_context(|| {
format!(
"failed to dispatch runner workflow for runner: {}",
runner_id
)
})?
};

(name.clone(), runner_id, workflow_id)
} else {
tracing::debug!(?packet, "invalid initial packet");
tracing::debug!(?init_packet, "invalid initial packet");
return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build());
};

// Forward to runner wf
ctx.signal(pegboard::workflows::runner2::Forward { inner: packet })
.to_workflow_id(workflow_id)
.send()
.await
.with_context(|| {
format!(
"failed to forward initial packet to workflow: {}",
workflow_id
)
})?;
if protocol::is_new(protocol_version) {
ctx.signal(Init);
} else {
// Forward to runner wf
ctx.signal(pegboard::workflows::runner::Forward { inner: init_packet })
.to_workflow_id(workflow_id)
.send()
.await
.with_context(|| {
format!(
"failed to forward initial packet to workflow: {}",
workflow_id
)
})?;
}

(runner_name, runner_id, workflow_id)
} else {
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/pegboard-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::sync::watch;
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use universalpubsub::PublishOpts;

mod actor_event_demuxer;
mod conn;
mod errors;
mod ping_task;
Expand Down Expand Up @@ -136,12 +137,14 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
let (tunnel_to_ws_abort_tx, tunnel_to_ws_abort_rx) = watch::channel(());
let (ws_to_tunnel_abort_tx, ws_to_tunnel_abort_rx) = watch::channel(());
let (ping_abort_tx, ping_abort_rx) = watch::channel(());
let (init_tx, init_rx) = watch::channel(());

let tunnel_to_ws = tokio::spawn(tunnel_to_ws_task::task(
self.ctx.clone(),
conn.clone(),
sub,
eviction_sub,
init_rx,
tunnel_to_ws_abort_rx,
));

Expand All @@ -150,6 +153,7 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
conn.clone(),
ws_handle.recv(),
eviction_sub2,
init_tx,
ws_to_tunnel_abort_rx,
));

Expand Down
42 changes: 33 additions & 9 deletions engine/packages/pegboard-runner/src/ping_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,32 @@ pub async fn task(
}

update_runner_ping(&ctx, &conn).await?;

// Send ping to runner
let ping_msg = versioned::ToClient::wrap_latest(protocol::ToClient::ToClientPing(
protocol::ToClientPing {
ts: util::timestamp::now(),
},
));
let ping_msg_serialized = ping_msg.serialize(conn.protocol_version)?;
conn.ws_handle
.send(Message::Binary(ping_msg_serialized.into()))
.await?;
}
}

async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> {
let Some(wf) = ctx
.workflow::<pegboard::workflows::runner2::Input>(conn.workflow_id)
.get()
.await?
else {
let wf = if protocol::is_mk2(conn.protocol_version) {
ctx.workflow::<pegboard::workflows::runner2::Input>(conn.workflow_id)
.get()
.await?
} else {
ctx.workflow::<pegboard::workflows::runner::Input>(conn.workflow_id)
.get()
.await?
};

let Some(wf) = wf else {
tracing::error!(?conn.runner_id, "workflow does not exist");
return Ok(());
};
Expand All @@ -55,10 +72,17 @@ async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> {
if let RunnerEligibility::ReEligible = notif.eligibility {
tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again");

ctx.signal(pegboard::workflows::runner2::CheckQueue {})
.to_workflow_id(notif.workflow_id)
.send()
.await?;
if protocol::is_mk2(conn.protocol_version) {
ctx.signal(pegboard::workflows::runner2::CheckQueue {})
.to_workflow_id(notif.workflow_id)
.send()
.await?;
} else {
ctx.signal(pegboard::workflows::runner::CheckQueue {})
.to_workflow_id(notif.workflow_id)
.send()
.await?;
}
}
}

Expand Down
Loading
Loading