Skip to content

Commit 0ca2fa2

Browse files
committed
fix(gas): fix batch listen, fix history for graceful signal send in workflows
1 parent 25ece3d commit 0ca2fa2

File tree

23 files changed

+764
-906
lines changed

23 files changed

+764
-906
lines changed

engine/packages/api-peer/src/actors/delete.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,14 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
6767
.signal(pegboard::workflows::actor::Destroy {})
6868
.to_workflow::<pegboard::workflows::actor::Workflow>()
6969
.tag("actor_id", path.actor_id)
70+
.graceful_not_found()
7071
.send()
71-
.await;
72-
73-
if let Some(WorkflowError::WorkflowNotFound) = res
74-
.as_ref()
75-
.err()
76-
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
77-
{
72+
.await?;
73+
if res.is_none() {
7874
tracing::warn!(
7975
actor_id=?path.actor_id,
8076
"actor workflow not found, likely already stopped"
8177
);
82-
} else {
83-
res?;
8478
}
8579

8680
Ok(DeleteResponse {})

engine/packages/engine/src/util/wf/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -468,22 +468,19 @@ pub async fn print_history(
468468
}
469469
}
470470
EventData::Signals(data) => {
471-
// Indent
472-
print!("{}{c} ", " ".repeat(indent));
473-
474471
for ((signal_id, name), body) in
475472
data.signal_ids.iter().zip(&data.names).zip(&data.bodies)
476473
{
477474
// Indent
478-
print!("{}{c} - ", " ".repeat(indent));
475+
print!("{}{c} - ", " ".repeat(indent));
479476
println!("{}", event_style.apply_to(name));
480477

481-
print!("{}{c} ", " ".repeat(indent));
478+
print!("{}{c} ", " ".repeat(indent));
482479
println!("id {}", style(signal_id).green());
483480

484481
if !exclude_json {
485482
// Indent
486-
print!("{}{c} ", " ".repeat(indent));
483+
print!("{}{c} ", " ".repeat(indent));
487484

488485
println!(
489486
"body {}",
@@ -590,7 +587,7 @@ pub fn print_event_name(event: &Event) {
590587
),
591588
EventData::Signal(signal) => print!(
592589
"{} {}",
593-
style.apply_to("signal receive").bold(),
590+
style.apply_to("signal").bold(),
594591
style.apply_to(&signal.name)
595592
),
596593
EventData::SignalSend(signal_send) => print!(
@@ -626,7 +623,7 @@ pub fn print_event_name(event: &Event) {
626623
EventData::Branch => print!("{}", style.apply_to("branch").bold()),
627624
EventData::Signals(signal) => print!(
628625
"{} {}",
629-
style.apply_to("signal receive").bold(),
626+
style.apply_to("signal recv").bold(),
630627
style.apply_to(&signal.names.len())
631628
),
632629
}

engine/packages/gasoline/src/builder/common/signal.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct SignalBuilder<T: Signal + Serialize> {
1818
to_workflow_name: Option<&'static str>,
1919
to_workflow_id: Option<Id>,
2020
tags: serde_json::Map<String, serde_json::Value>,
21+
graceful_not_found: bool,
2122
error: Option<BuilderError>,
2223
}
2324

@@ -37,6 +38,7 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
3738
to_workflow_name: None,
3839
to_workflow_id: None,
3940
tags: serde_json::Map::new(),
41+
graceful_not_found: false,
4042
error: from_workflow.then_some(BuilderError::CannotDispatchFromOpInWorkflow),
4143
}
4244
}
@@ -102,8 +104,21 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
102104
self
103105
}
104106

107+
/// Does not throw an error when the signal target is not found and instead returns `Ok(None)`.
108+
pub fn graceful_not_found(mut self) -> Self {
109+
if self.error.is_some() {
110+
return self;
111+
}
112+
113+
self.graceful_not_found = true;
114+
115+
self
116+
}
117+
118+
/// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not
119+
/// exist, will always return `Some`.
105120
#[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))]
106-
pub async fn send(self) -> Result<Id> {
121+
pub async fn send(self) -> Result<Option<Id>> {
107122
if let Some(err) = self.error {
108123
return Err(err.into());
109124
}
@@ -132,8 +147,18 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
132147
let workflow_id = self
133148
.db
134149
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
135-
.await?
136-
.ok_or(WorkflowError::WorkflowNotFound)?;
150+
.await?;
151+
152+
let Some(workflow_id) = workflow_id else {
153+
// Handle signal target not found gracefully
154+
if self.graceful_not_found {
155+
tracing::debug!("signal target not found");
156+
157+
return Ok(None);
158+
} else {
159+
return Err(WorkflowError::WorkflowNotFound.into());
160+
}
161+
};
137162

138163
self.db
139164
.publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val)
@@ -188,6 +213,6 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
188213
],
189214
);
190215

191-
Ok(signal_id)
216+
Ok(Some(signal_id))
192217
}
193218
}

engine/packages/gasoline/src/builder/workflow/signal.rs

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ use rivet_util::Id;
66
use serde::Serialize;
77

88
use crate::{
9-
builder::BuilderError, ctx::WorkflowCtx, error::WorkflowError, history::cursor::HistoryResult,
10-
metrics, signal::Signal, workflow::Workflow,
9+
builder::BuilderError,
10+
ctx::WorkflowCtx,
11+
error::WorkflowError,
12+
history::{cursor::HistoryResult, event::EventType, removed::Signal as RemovedSignal},
13+
metrics,
14+
signal::Signal,
15+
workflow::Workflow,
1116
};
1217

1318
pub struct SignalBuilder<'a, T: Signal + Serialize> {
@@ -18,6 +23,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> {
1823
to_workflow_name: Option<&'static str>,
1924
to_workflow_id: Option<Id>,
2025
tags: serde_json::Map<String, serde_json::Value>,
26+
graceful_not_found: bool,
2127
error: Option<BuilderError>,
2228
}
2329

@@ -31,6 +37,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
3137
to_workflow_name: None,
3238
to_workflow_id: None,
3339
tags: serde_json::Map::new(),
40+
graceful_not_found: false,
3441
error: None,
3542
}
3643
}
@@ -85,14 +92,39 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
8592
self
8693
}
8794

95+
/// Does not throw an error when the signal target is not found and instead returns `Ok(None)`.
96+
pub fn graceful_not_found(mut self) -> Self {
97+
if self.error.is_some() {
98+
return self;
99+
}
100+
101+
self.graceful_not_found = true;
102+
103+
self
104+
}
105+
106+
/// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not
107+
/// exist, will always return `Some`.
88108
#[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))]
89-
pub async fn send(self) -> Result<Id> {
109+
pub async fn send(self) -> Result<Option<Id>> {
90110
self.ctx.check_stop()?;
91111

92112
if let Some(err) = self.error {
93113
return Err(err.into());
94114
}
95115

116+
// Check if this signal is being replayed and previously had no target (will have a removed event)
117+
if self.graceful_not_found && self.ctx.cursor().is_removed() {
118+
self.ctx.cursor().compare_removed::<RemovedSignal<T>>()?;
119+
120+
tracing::debug!("replaying gracefully not found signal dispatch");
121+
122+
// Move to next event
123+
self.ctx.cursor_mut().inc();
124+
125+
return Ok(None);
126+
}
127+
96128
// Error for version mismatch. This is done in the builder instead of in `VersionedWorkflowCtx` to
97129
// defer the error.
98130
self.ctx.compare_version("signal", self.version)?;
@@ -105,7 +137,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
105137

106138
// Signal sent before
107139
let signal_id = if let HistoryResult::Event(signal) = history_res {
108-
tracing::debug!("replaying signal dispatch",);
140+
tracing::debug!("replaying signal dispatch");
109141

110142
signal.signal_id
111143
}
@@ -133,8 +165,33 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
133165
.ctx
134166
.db()
135167
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
136-
.await?
137-
.ok_or(WorkflowError::WorkflowNotFound)?;
168+
.await?;
169+
170+
let Some(workflow_id) = workflow_id else {
171+
// Handle signal target not found gracefully
172+
if self.graceful_not_found {
173+
tracing::debug!("signal target not found");
174+
175+
// Insert removed event
176+
self.ctx
177+
.db()
178+
.commit_workflow_removed_event(
179+
self.ctx.workflow_id(),
180+
&location,
181+
EventType::SignalSend,
182+
Some(T::NAME),
183+
self.ctx.loop_location(),
184+
)
185+
.await?;
186+
187+
// Move to next event
188+
self.ctx.cursor_mut().update(&location);
189+
190+
return Ok(None);
191+
} else {
192+
return Err(WorkflowError::WorkflowNotFound.into());
193+
}
194+
};
138195

139196
self.ctx
140197
.db()
@@ -222,6 +279,6 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
222279
// Move to next event
223280
self.ctx.cursor_mut().update(&location);
224281

225-
Ok(signal_id)
282+
Ok(Some(signal_id))
226283
}
227284
}

engine/packages/gasoline/src/ctx/workflow.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ impl WorkflowCtx {
819819
let loop_location = self.cursor.current_location_for(&history_res);
820820

821821
// Loop existed before
822-
let (mut iteration, mut state, output, mut loop_event_commit_fut) =
822+
let (mut iteration, mut state, output, mut loop_event_init_fut) =
823823
if let HistoryResult::Event(loop_event) = history_res {
824824
let state = loop_event.parse_state()?;
825825
let output = loop_event.parse_output()?;
@@ -838,7 +838,7 @@ impl WorkflowCtx {
838838
let nested_loop_location = self.loop_location().cloned();
839839

840840
// This future is deferred until later for parallelization
841-
let loop_event_commit_fut = async move {
841+
let loop_event_init_fut = async move {
842842
db2.upsert_workflow_loop_event(
843843
workflow_id,
844844
&name,
@@ -852,7 +852,7 @@ impl WorkflowCtx {
852852
.await
853853
};
854854

855-
(0, state, None, Some(loop_event_commit_fut))
855+
(0, state, None, Some(loop_event_init_fut))
856856
};
857857

858858
// Create a branch for the loop event
@@ -869,6 +869,9 @@ impl WorkflowCtx {
869869
else {
870870
tracing::debug!("running loop");
871871

872+
// Used to defer loop upsertion for parallelization
873+
let mut loop_event_upsert_fut = None;
874+
872875
loop {
873876
self.check_stop()?;
874877

@@ -898,14 +901,23 @@ impl WorkflowCtx {
898901
// NOTE: Great care has been taken to optimize this function. This join allows multiple
899902
// txns to run simultaneously instead of in series but is hard to read.
900903
//
901-
// 1. First, but not necessarily chronologically first because its parallelized, we
904+
// 1. First (but not necessarily chronologically first because its parallelized), we
902905
// commit the loop event. This only happens on the first iteration of the loop
903906
// 2. Second, we commit the branch event for the current iteration
904-
// 3. Last, we run the user's loop code
905-
let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!(
907+
// 3. Third, we run the user's loop code
908+
// 4. Last, if we have to upsert the loop event, we save the future and process it in the
909+
// next iteration of the loop as part of this join
910+
let (loop_event_commit_res, loop_event_upsert_res, branch_commit_res, loop_res) = tokio::join!(
906911
async {
907-
if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() {
908-
loop_event_commit_fut.await
912+
if let Some(loop_event_init_fut) = loop_event_init_fut.take() {
913+
loop_event_init_fut.await
914+
} else {
915+
Ok(())
916+
}
917+
},
918+
async {
919+
if let Some(loop_event_upsert_fut) = loop_event_upsert_fut.take() {
920+
loop_event_upsert_fut.await
909921
} else {
910922
Ok(())
911923
}
@@ -928,6 +940,7 @@ impl WorkflowCtx {
928940
);
929941

930942
loop_event_commit_res?;
943+
loop_event_upsert_res?;
931944
branch_commit_res?;
932945

933946
// Run loop
@@ -936,23 +949,33 @@ impl WorkflowCtx {
936949
let dt2 = start_instant2.elapsed().as_secs_f64();
937950
iteration += 1;
938951

939-
let state_val = serde_json::value::to_raw_value(&state)
940-
.map_err(WorkflowError::SerializeLoopOutput)?;
941-
942952
// Commit workflow state to db
943953
if iteration % LOOP_ITERS_PER_COMMIT == 0 {
944-
self.db
945-
.upsert_workflow_loop_event(
946-
self.workflow_id,
947-
&self.name,
954+
let state_val = serde_json::value::to_raw_value(&state)
955+
.map_err(WorkflowError::SerializeLoopOutput)?;
956+
957+
// Clone data to move into future
958+
let loop_location = loop_location.clone();
959+
let db2 = self.db.clone();
960+
let workflow_id = self.workflow_id;
961+
let name = self.name.clone();
962+
let version = self.version;
963+
let nested_loop_location = self.loop_location().cloned();
964+
965+
// Defer upsertion to next iteration so it runs in parallel
966+
loop_event_upsert_fut = Some(async move {
967+
db2.upsert_workflow_loop_event(
968+
workflow_id,
969+
&name,
948970
&loop_location,
949-
self.version,
971+
version,
950972
iteration,
951973
&state_val,
952974
None,
953-
self.loop_location(),
975+
nested_loop_location.as_ref(),
954976
)
955-
.await?;
977+
.await
978+
});
956979
}
957980

958981
anyhow::Ok((dt2, None))
@@ -966,7 +989,8 @@ impl WorkflowCtx {
966989
let output_val = serde_json::value::to_raw_value(&res)
967990
.map_err(WorkflowError::SerializeLoopOutput)?;
968991

969-
// Commit loop output and final state to db
992+
// Commit loop output and final state to db. Note that we don't defer this because
993+
// there will be no more loop iterations afterwards.
970994
self.db
971995
.upsert_workflow_loop_event(
972996
self.workflow_id,
@@ -1338,7 +1362,7 @@ impl WorkflowCtx {
13381362

13391363
// Existing event
13401364
if self.cursor.compare_removed::<T>()? {
1341-
tracing::debug!("skipping removed step",);
1365+
tracing::debug!("skipping removed step");
13421366
}
13431367
// New "removed" event
13441368
else {

0 commit comments

Comments
 (0)