Skip to content

Commit 25ece3d

Browse files
committed
feat(gas): add batch listen
1 parent 5c9b75f commit 25ece3d

File tree

16 files changed

+1215
-732
lines changed

16 files changed

+1215
-732
lines changed

engine/packages/engine/src/util/wf/mod.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,35 @@ pub async fn print_history(
467467
}
468468
}
469469
}
470+
EventData::Signals(data) => {
471+
// Indent
472+
print!("{}{c} ", " ".repeat(indent));
473+
474+
for ((signal_id, name), body) in
475+
data.signal_ids.iter().zip(&data.names).zip(&data.bodies)
476+
{
477+
// Indent
478+
print!("{}{c} - ", " ".repeat(indent));
479+
println!("{}", event_style.apply_to(name));
480+
481+
print!("{}{c} ", " ".repeat(indent));
482+
println!("id {}", style(signal_id).green());
483+
484+
if !exclude_json {
485+
// Indent
486+
print!("{}{c} ", " ".repeat(indent));
487+
488+
println!(
489+
"body {}",
490+
indent_string(
491+
&colored_json(body)?,
492+
format!("{}{c} ", " ".repeat(indent)),
493+
true
494+
)
495+
);
496+
}
497+
}
498+
}
470499
_ => {}
471500
}
472501
}
@@ -543,7 +572,7 @@ pub fn event_style(event: &Event) -> Style {
543572
EventData::Removed(_) => Style::new().red(),
544573
EventData::VersionCheck => Style::new().red(),
545574
EventData::Branch => Style::new(),
546-
EventData::Empty => Style::new(),
575+
EventData::Signals(_) => Style::new().cyan(),
547576
}
548577
}
549578

@@ -595,7 +624,11 @@ pub fn print_event_name(event: &Event) {
595624
}
596625
EventData::VersionCheck => print!("{}", style.apply_to("version check").bold()),
597626
EventData::Branch => print!("{}", style.apply_to("branch").bold()),
598-
EventData::Empty => print!("{}", style.apply_to("empty").bold()),
627+
EventData::Signals(signal) => print!(
628+
"{} {}",
629+
style.apply_to("signal receive").bold(),
630+
style.apply_to(&signal.names.len())
631+
),
599632
}
600633
}
601634

engine/packages/gasoline-macros/src/lib.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,13 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream {
372372

373373
#[async_trait::async_trait]
374374
impl gas::listen::Listen for #ident {
375-
async fn listen(ctx: &mut gas::prelude::ListenCtx) -> gas::prelude::WorkflowResult<Self> {
376-
let row = ctx.listen_any(&[<Self as gas::signal::Signal>::NAME]).await?;
377-
Self::parse(&row.signal_name, &row.body)
375+
async fn listen(ctx: &mut gas::prelude::ListenCtx, limit: usize) -> gas::prelude::WorkflowResult<Vec<Self>> {
376+
ctx
377+
.listen_any(&[<Self as gas::signal::Signal>::NAME], limit)
378+
.await?
379+
.into_iter()
380+
.map(|signal| Self::parse(&signal.signal_name, &signal.body))
381+
.collect()
378382
}
379383

380384
fn parse(_name: &str, body: &serde_json::value::RawValue) -> gas::prelude::WorkflowResult<Self> {

engine/packages/gasoline/src/ctx/listen.rs

Lines changed: 37 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use rivet_metrics::KeyValue;
2-
use std::{ops::Deref, time::Instant};
2+
use std::ops::Deref;
33

44
use crate::{
55
ctx::WorkflowCtx,
@@ -14,7 +14,7 @@ pub struct ListenCtx<'a> {
1414
ctx: &'a WorkflowCtx,
1515
location: &'a Location,
1616
// Used by certain db drivers to know when to update internal indexes for signal wake conditions
17-
last_try: bool,
17+
last_attempt: bool,
1818
// HACK: Prevent `ListenCtx::listen_any` from being called more than once
1919
used: bool,
2020
}
@@ -24,14 +24,14 @@ impl<'a> ListenCtx<'a> {
2424
ListenCtx {
2525
ctx,
2626
location,
27-
last_try: false,
27+
last_attempt: false,
2828
used: false,
2929
}
3030
}
3131

32-
pub(crate) fn reset(&mut self, last_try: bool) {
32+
pub(crate) fn reset(&mut self, last_attempt: bool) {
3333
self.used = false;
34-
self.last_try = last_try;
34+
self.last_attempt = last_attempt;
3535
}
3636

3737
/// Checks for a signal to this workflow with any of the given signal names.
@@ -40,75 +40,63 @@ impl<'a> ListenCtx<'a> {
4040
pub async fn listen_any(
4141
&mut self,
4242
signal_names: &[&'static str],
43-
) -> WorkflowResult<SignalData> {
43+
limit: usize,
44+
) -> WorkflowResult<Vec<SignalData>> {
4445
if self.used {
4546
return Err(WorkflowError::ListenCtxUsed);
4647
} else {
4748
self.used = true;
4849
}
4950

50-
let start_instant = Instant::now();
51-
52-
// Fetch new pending signal
53-
let signal = self
51+
// Fetch new pending signals
52+
let signals = self
5453
.ctx
5554
.db()
56-
.pull_next_signal(
55+
.pull_next_signals(
5756
self.ctx.workflow_id(),
5857
self.ctx.name(),
5958
signal_names,
6059
self.location,
6160
self.ctx.version(),
6261
self.ctx.loop_location(),
63-
self.last_try,
62+
limit,
63+
self.last_attempt,
6464
)
6565
.await?;
6666

67-
let dt = start_instant.elapsed().as_secs_f64();
68-
metrics::SIGNAL_PULL_DURATION.record(
69-
dt,
70-
&[
71-
KeyValue::new("workflow_name", self.ctx.name().to_string()),
72-
KeyValue::new(
73-
"signal_name",
74-
signal
75-
.as_ref()
76-
.map(|signal| signal.signal_name.clone())
77-
.unwrap_or("<none>".into()),
78-
),
79-
],
80-
);
81-
82-
let Some(signal) = signal else {
67+
if signals.is_empty() {
8368
return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
84-
};
69+
}
8570

86-
let recv_lag = (rivet_util::timestamp::now() as f64 - signal.create_ts as f64) / 1000.;
87-
crate::metrics::SIGNAL_RECV_LAG.record(
88-
recv_lag,
89-
&[
90-
KeyValue::new("workflow_name", self.ctx.name().to_string()),
91-
KeyValue::new("signal_name", signal.signal_name.clone()),
92-
],
93-
);
71+
let now = rivet_util::timestamp::now();
72+
for signal in &signals {
73+
let recv_lag = (now as f64 - signal.create_ts as f64) / 1000.0;
74+
metrics::SIGNAL_RECV_LAG.record(
75+
recv_lag,
76+
&[
77+
KeyValue::new("workflow_name", self.ctx.name().to_string()),
78+
KeyValue::new("signal_name", signal.signal_name.clone()),
79+
],
80+
);
81+
82+
if recv_lag > 3.0 {
83+
// We print an error here so the trace of this workflow does not get dropped
84+
tracing::error!(
85+
?recv_lag,
86+
signal_id=%signal.signal_id,
87+
signal_name=%signal.signal_name,
88+
"long signal recv time",
89+
);
90+
}
9491

95-
if recv_lag > 3.0 {
96-
// We print an error here so the trace of this workflow does not get dropped
97-
tracing::error!(
98-
?recv_lag,
92+
tracing::debug!(
9993
signal_id=%signal.signal_id,
10094
signal_name=%signal.signal_name,
101-
"long signal recv time",
95+
"signal received",
10296
);
10397
}
10498

105-
tracing::debug!(
106-
signal_id=%signal.signal_id,
107-
signal_name=%signal.signal_name,
108-
"signal received",
109-
);
110-
111-
Ok(signal)
99+
Ok(signals)
112100
}
113101
}
114102

engine/packages/gasoline/src/ctx/versioned_workflow.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
builder::{WorkflowRepr, workflow as builder},
1010
ctx::{WorkflowCtx, workflow::Loop},
1111
executable::{AsyncResult, Executable},
12-
listen::{CustomListener, Listen},
12+
listen::Listen,
1313
message::Message,
1414
signal::Signal,
1515
utils::time::{DurationToMillis, TsToMillis},
@@ -125,17 +125,6 @@ impl<'a> VersionedWorkflowCtx<'a> {
125125
})
126126
}
127127

128-
/// Execute a custom listener.
129-
#[tracing::instrument(skip_all, fields(t=std::any::type_name::<T>()))]
130-
pub async fn custom_listener<T: CustomListener>(
131-
&mut self,
132-
listener: &T,
133-
) -> Result<<T as CustomListener>::Output> {
134-
wrap!(self, "listen", {
135-
self.inner.custom_listener(listener).in_current_span().await
136-
})
137-
}
138-
139128
/// Creates a message builder.
140129
pub fn msg<M: Message>(&mut self, body: M) -> builder::message::MessageBuilder<'_, M> {
141130
builder::message::MessageBuilder::new(self.inner, self.version(), body)

0 commit comments

Comments
 (0)