Skip to content

Commit e5b2d7f

Browse files
committed
fix(gas): optimize loops
1 parent bbe84e7 commit e5b2d7f

File tree

1 file changed

+77
-37
lines changed

1 file changed

+77
-37
lines changed

engine/packages/gasoline/src/ctx/workflow.rs

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use crate::{
4242
const DB_ACTION_RETRY: Duration = Duration::from_millis(150);
4343
/// Most db action retries
4444
const MAX_DB_ACTION_RETRIES: usize = 5;
45+
/// How often to commit loop event data to db and mark previous loop history to forgotten
46+
const LOOP_ITERS_PER_COMMIT: usize = 20;
4547

4648
// NOTE: Cloneable because of inner arcs
4749
#[derive(Clone)]
@@ -868,34 +870,43 @@ impl WorkflowCtx {
868870
let loop_location = self.cursor.current_location_for(&history_res);
869871

870872
// Loop existed before
871-
let (mut iteration, mut state, output) =
873+
let (mut iteration, mut state, output, mut loop_event_commit_fut) =
872874
if let HistoryResult::Event(loop_event) = history_res {
873875
let state = loop_event.parse_state()?;
874876
let output = loop_event.parse_output()?;
875877

876-
(loop_event.iteration, state, output)
878+
(loop_event.iteration, state, output, None)
877879
} else {
878880
let state_val = serde_json::value::to_raw_value(&state)
879881
.map_err(WorkflowError::SerializeLoopOutput)?;
880882

881-
// Insert event before loop is run so the history is consistent
882-
self.db
883-
.upsert_workflow_loop_event(
884-
self.workflow_id,
885-
&self.name,
883+
// Clone data to move into future
884+
let loop_location = loop_location.clone();
885+
let db2 = self.db.clone();
886+
let workflow_id = self.workflow_id;
887+
let name = self.name.clone();
888+
let version = self.version;
889+
let nested_loop_location = self.loop_location().cloned();
890+
891+
// This future is deferred until later for parallelization
892+
let loop_event_commit_fut = async move {
893+
db2.upsert_workflow_loop_event(
894+
workflow_id,
895+
&name,
886896
&loop_location,
887-
self.version,
897+
version,
888898
0,
889899
&state_val,
890900
None,
891-
self.loop_location(),
901+
nested_loop_location.as_ref(),
892902
)
893-
.await?;
903+
.await
904+
};
894905

895-
(0, state, None)
906+
(0, state, None, Some(loop_event_commit_fut))
896907
};
897908

898-
// Create a branch but no branch event (loop event takes its place)
909+
// Create a branch for the loop event
899910
let mut loop_branch =
900911
self.branch_inner(self.input.clone(), self.version, loop_location.clone());
901912

@@ -923,6 +934,7 @@ impl WorkflowCtx {
923934
.root()
924935
.join(Coordinate::simple(iteration + 1)),
925936
);
937+
let iteration_branch_root = iteration_branch.cursor.root().clone();
926938

927939
// Set branch loop location to the current loop
928940
iteration_branch.loop_location = Some(loop_location.clone());
@@ -931,41 +943,68 @@ impl WorkflowCtx {
931943

932944
// Async block for instrumentation purposes
933945
let (dt2, res) = async {
934-
// Insert event if iteration is not a replay
935-
if !loop_branch.cursor.compare_loop_branch(iteration)? {
936-
self.db
937-
.commit_workflow_branch_event(
938-
self.workflow_id,
939-
iteration_branch.cursor.root(),
940-
self.version,
941-
Some(&loop_location),
942-
)
943-
.await?;
944-
}
945-
946946
let start_instant2 = Instant::now();
947+
let db2 = self.db.clone();
948+
949+
// NOTE: Great care has been taken to optimize this function. This join allows multiple
950+
// txns to run simultaneously instead of in series but is hard to read.
951+
//
952+
// 1. First, but not necessarily chronologically first because its parallelized, we
953+
// commit the loop event. This only happens on the first iteration of the loop
954+
// 2. Second, we commit the branch event for the current iteration
955+
// 3. Last, we run the user's loop code
956+
let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!(
957+
async {
958+
if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() {
959+
loop_event_commit_fut.await
960+
} else {
961+
Ok(())
962+
}
963+
},
964+
async {
965+
// Insert event if iteration is not a replay
966+
if !loop_branch.cursor.compare_loop_branch(iteration)? {
967+
db2.commit_workflow_branch_event(
968+
self.workflow_id,
969+
&iteration_branch_root,
970+
self.version,
971+
Some(&loop_location),
972+
)
973+
.await
974+
} else {
975+
Ok(())
976+
}
977+
},
978+
cb(&mut iteration_branch, &mut state),
979+
);
980+
981+
loop_event_commit_res?;
982+
branch_commit_res?;
947983

948984
// Run loop
949-
match cb(&mut iteration_branch, &mut state).await? {
985+
match loop_res? {
950986
Loop::Continue => {
951987
let dt2 = start_instant2.elapsed().as_secs_f64();
952988
iteration += 1;
953989

954990
let state_val = serde_json::value::to_raw_value(&state)
955991
.map_err(WorkflowError::SerializeLoopOutput)?;
956992

957-
self.db
958-
.upsert_workflow_loop_event(
959-
self.workflow_id,
960-
&self.name,
961-
&loop_location,
962-
self.version,
963-
iteration,
964-
&state_val,
965-
None,
966-
self.loop_location(),
967-
)
968-
.await?;
993+
// Commit workflow state to db
994+
if iteration % LOOP_ITERS_PER_COMMIT == 0 {
995+
self.db
996+
.upsert_workflow_loop_event(
997+
self.workflow_id,
998+
&self.name,
999+
&loop_location,
1000+
self.version,
1001+
iteration,
1002+
&state_val,
1003+
None,
1004+
self.loop_location(),
1005+
)
1006+
.await?;
1007+
}
9691008

9701009
anyhow::Ok((dt2, None))
9711010
}
@@ -978,6 +1017,7 @@ impl WorkflowCtx {
9781017
let output_val = serde_json::value::to_raw_value(&res)
9791018
.map_err(WorkflowError::SerializeLoopOutput)?;
9801019

1020+
// Commit loop output and final state to db
9811021
self.db
9821022
.upsert_workflow_loop_event(
9831023
self.workflow_id,

0 commit comments

Comments
 (0)