Skip to content

Commit bc2bb7c

Browse files
committed
fix(pb): simplify runner wf
1 parent 5972ed8 commit bc2bb7c

File tree

21 files changed

+1824
-837
lines changed

21 files changed

+1824
-837
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::collections::HashMap;
2+
use std::time::{Duration, Instant};
3+
4+
use anyhow::Result;
5+
use gas::prelude::*;
6+
use rivet_runner_protocol as protocol;
7+
use tokio::sync::mpsc;
8+
use tokio::task::JoinHandle;
9+
10+
const GC_INTERVAL: Duration = Duration::from_secs(30);
11+
const MAX_LAST_SEEN: Duration = Duration::from_secs(30);
12+
13+
struct Channel {
14+
tx: mpsc::UnboundedSender<protocol::Event>,
15+
handle: JoinHandle<()>,
16+
last_seen: Instant,
17+
}
18+
19+
pub struct ActorEventDemuxer {
20+
ctx: StandaloneCtx,
21+
channels: HashMap<Id, Channel>,
22+
last_gc: Instant,
23+
}
24+
25+
impl ActorEventDemuxer {
26+
pub fn new(ctx: StandaloneCtx) -> Self {
27+
Self {
28+
ctx,
29+
channels: HashMap::new(),
30+
last_gc: Instant::now(),
31+
}
32+
}
33+
34+
/// Process an event by routing it to the appropriate actor's queue
35+
#[tracing::instrument(skip_all)]
36+
pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) {
37+
if let Some(channel) = self.channels.get(&actor_id) {
38+
let _ = channel.tx.send(event);
39+
} else {
40+
let (tx, mut rx) = mpsc::unbounded_channel();
41+
42+
let ctx = self.ctx.clone();
43+
let handle = tokio::spawn(async move {
44+
loop {
45+
let mut buffer = Vec::new();
46+
47+
// Batch process events
48+
if rx.recv_many(&mut buffer, 1024).await == 0 {
49+
break;
50+
}
51+
52+
if let Err(err) = dispatch_events(&ctx, actor_id, buffer).await {
53+
tracing::error!(?err, "actor event processor failed");
54+
break;
55+
}
56+
}
57+
});
58+
59+
self.channels.insert(
60+
actor_id,
61+
Channel {
62+
tx,
63+
handle,
64+
last_seen: Instant::now(),
65+
},
66+
);
67+
}
68+
69+
// Run gc periodically
70+
if self.last_gc.elapsed() > GC_INTERVAL {
71+
self.last_gc = Instant::now();
72+
73+
self.channels.retain(|_, channel| {
74+
let keep = channel.last_seen.elapsed() < MAX_LAST_SEEN;
75+
76+
if !keep {
77+
// TODO: Verify aborting is safe here
78+
channel.handle.abort();
79+
}
80+
81+
keep
82+
});
83+
}
84+
}
85+
86+
/// Shutdown all tasks and wait for them to complete
87+
#[tracing::instrument(skip_all)]
88+
pub async fn shutdown(self) {
89+
tracing::debug!(channels=?self.channels.len(), "shutting down actor demuxer");
90+
91+
// Drop all senders
92+
let handles = self
93+
.channels
94+
.into_iter()
95+
.map(|(_, channel)| channel.handle)
96+
.collect::<Vec<_>>();
97+
98+
// Await remaining tasks
99+
for handle in handles {
100+
let _ = handle.await;
101+
}
102+
103+
tracing::debug!("actor demuxer shut down");
104+
}
105+
}
106+
107+
async fn dispatch_events(
108+
ctx: &StandaloneCtx,
109+
actor_id: Id,
110+
events: Vec<protocol::Event>,
111+
) -> Result<()> {
112+
let res = ctx
113+
.signal(pegboard::workflows::actor::Events { inner: events })
114+
.tag("actor_id", actor_id)
115+
.graceful_not_found()
116+
.send()
117+
.await
118+
.with_context(|| format!("failed to forward signal to actor workflow: {}", actor_id))?;
119+
if res.is_none() {
120+
tracing::warn!(
121+
?actor_id,
122+
"failed to send signal to actor workflow, likely already stopped"
123+
);
124+
}
125+
126+
Ok(())
127+
}

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub async fn init_conn(
6464
}
6565
};
6666

67-
let packet = versioned::ToServer::deserialize(&buf, protocol_version)
67+
let init_packet = versioned::ToServer::deserialize(&buf, protocol_version)
6868
.map_err(|err| WsError::InvalidPacket(err.to_string()).build())
6969
.context("failed to deserialize initial packet from client")?;
7070

@@ -74,7 +74,7 @@ pub async fn init_conn(
7474
version,
7575
total_slots,
7676
..
77-
}) = &packet
77+
}) = &init_packet
7878
{
7979
// Look up existing runner by key
8080
let existing_runner = ctx
@@ -127,14 +127,15 @@ pub async fn init_conn(
127127
};
128128

129129
// Spawn a new runner workflow if one doesn't already exist
130-
let workflow_id = ctx
131-
.workflow(pegboard::workflows::runner2::Input {
130+
let workflow_id = if protocol::is_new(protocol_version) {
131+
ctx.workflow(pegboard::workflows::runner2::Input {
132132
runner_id,
133133
namespace_id: namespace.namespace_id,
134134
name: name.clone(),
135135
key: runner_key.clone(),
136136
version: version.clone(),
137137
total_slots: *total_slots,
138+
protocol_version,
138139
})
139140
.tag("runner_id", runner_id)
140141
.unique()
@@ -145,25 +146,49 @@ pub async fn init_conn(
145146
"failed to dispatch runner workflow for runner: {}",
146147
runner_id
147148
)
148-
})?;
149+
})?
150+
} else {
151+
ctx.workflow(pegboard::workflows::runner::Input {
152+
runner_id,
153+
namespace_id: namespace.namespace_id,
154+
name: name.clone(),
155+
key: runner_key.clone(),
156+
version: version.clone(),
157+
total_slots: *total_slots,
158+
})
159+
.tag("runner_id", runner_id)
160+
.unique()
161+
.dispatch()
162+
.await
163+
.with_context(|| {
164+
format!(
165+
"failed to dispatch runner workflow for runner: {}",
166+
runner_id
167+
)
168+
})?
169+
};
149170

150171
(name.clone(), runner_id, workflow_id)
151172
} else {
152-
tracing::debug!(?packet, "invalid initial packet");
173+
tracing::debug!(?init_packet, "invalid initial packet");
153174
return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build());
154175
};
155176

156-
// Forward to runner wf
157-
ctx.signal(pegboard::workflows::runner2::Forward { inner: packet })
158-
.to_workflow_id(workflow_id)
159-
.send()
160-
.await
161-
.with_context(|| {
162-
format!(
163-
"failed to forward initial packet to workflow: {}",
164-
workflow_id
165-
)
166-
})?;
177+
if protocol::is_new(protocol_version) {
178+
ctx.signal(Init);
179+
} else {
180+
// Forward to runner wf
181+
ctx.signal(pegboard::workflows::runner::Forward { inner: init_packet })
182+
.to_workflow_id(workflow_id)
183+
.send()
184+
.await
185+
.with_context(|| {
186+
format!(
187+
"failed to forward initial packet to workflow: {}",
188+
workflow_id
189+
)
190+
})?;
191+
}
167192

168193
(runner_name, runner_id, workflow_id)
169194
} else {

engine/packages/pegboard-runner/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tokio::sync::watch;
1515
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
1616
use universalpubsub::PublishOpts;
1717

18+
mod actor_event_demuxer;
1819
mod conn;
1920
mod errors;
2021
mod ping_task;
@@ -136,12 +137,14 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
136137
let (tunnel_to_ws_abort_tx, tunnel_to_ws_abort_rx) = watch::channel(());
137138
let (ws_to_tunnel_abort_tx, ws_to_tunnel_abort_rx) = watch::channel(());
138139
let (ping_abort_tx, ping_abort_rx) = watch::channel(());
140+
let (init_tx, init_rx) = watch::channel(());
139141

140142
let tunnel_to_ws = tokio::spawn(tunnel_to_ws_task::task(
141143
self.ctx.clone(),
142144
conn.clone(),
143145
sub,
144146
eviction_sub,
147+
init_rx,
145148
tunnel_to_ws_abort_rx,
146149
));
147150

@@ -150,6 +153,7 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
150153
conn.clone(),
151154
ws_handle.recv(),
152155
eviction_sub2,
156+
init_tx,
153157
ws_to_tunnel_abort_rx,
154158
));
155159

engine/packages/pegboard-runner/src/ping_task.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,32 @@ pub async fn task(
2121
}
2222

2323
update_runner_ping(&ctx, &conn).await?;
24+
25+
// Send ping to runner
26+
let ping_msg = versioned::ToClient::wrap_latest(protocol::ToClient::ToClientPing(
27+
protocol::ToClientPing {
28+
ts: util::timestamp::now(),
29+
},
30+
));
31+
let ping_msg_serialized = ping_msg.serialize(conn.protocol_version)?;
32+
conn.ws_handle
33+
.send(Message::Binary(ping_msg_serialized.into()))
34+
.await?;
2435
}
2536
}
2637

2738
async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> {
28-
let Some(wf) = ctx
29-
.workflow::<pegboard::workflows::runner2::Input>(conn.workflow_id)
30-
.get()
31-
.await?
32-
else {
39+
let wf = if protocol::is_mk2(conn.protocol_version) {
40+
ctx.workflow::<pegboard::workflows::runner2::Input>(conn.workflow_id)
41+
.get()
42+
.await?
43+
} else {
44+
ctx.workflow::<pegboard::workflows::runner::Input>(conn.workflow_id)
45+
.get()
46+
.await?
47+
};
48+
49+
let Some(wf) = wf else {
3350
tracing::error!(?conn.runner_id, "workflow does not exist");
3451
return Ok(());
3552
};
@@ -55,10 +72,17 @@ async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> {
5572
if let RunnerEligibility::ReEligible = notif.eligibility {
5673
tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again");
5774

58-
ctx.signal(pegboard::workflows::runner2::CheckQueue {})
59-
.to_workflow_id(notif.workflow_id)
60-
.send()
61-
.await?;
75+
if protocol::is_mk2(conn.protocol_version) {
76+
ctx.signal(pegboard::workflows::runner2::CheckQueue {})
77+
.to_workflow_id(notif.workflow_id)
78+
.send()
79+
.await?;
80+
} else {
81+
ctx.signal(pegboard::workflows::runner::CheckQueue {})
82+
.to_workflow_id(notif.workflow_id)
83+
.send()
84+
.await?;
85+
}
6286
}
6387
}
6488

0 commit comments

Comments
 (0)