@@ -42,6 +42,8 @@ use crate::{
4242const DB_ACTION_RETRY : Duration = Duration :: from_millis ( 150 ) ;
4343/// Most db action retries
4444const MAX_DB_ACTION_RETRIES : usize = 5 ;
45+ /// How often to commit loop event data to db and mark previous loop history to forgotten
46+ const LOOP_ITERS_PER_COMMIT : usize = 20 ;
4547
4648// NOTE: Cloneable because of inner arcs
4749#[ derive( Clone ) ]
@@ -868,34 +870,43 @@ impl WorkflowCtx {
868870 let loop_location = self . cursor . current_location_for ( & history_res) ;
869871
870872 // Loop existed before
871- let ( mut iteration, mut state, output) =
873+ let ( mut iteration, mut state, output, mut loop_event_commit_fut ) =
872874 if let HistoryResult :: Event ( loop_event) = history_res {
873875 let state = loop_event. parse_state ( ) ?;
874876 let output = loop_event. parse_output ( ) ?;
875877
876- ( loop_event. iteration , state, output)
878+ ( loop_event. iteration , state, output, None )
877879 } else {
878880 let state_val = serde_json:: value:: to_raw_value ( & state)
879881 . map_err ( WorkflowError :: SerializeLoopOutput ) ?;
880882
881- // Insert event before loop is run so the history is consistent
882- self . db
883- . upsert_workflow_loop_event (
884- self . workflow_id ,
885- & self . name ,
883+ // Clone data to move into future
884+ let loop_location = loop_location. clone ( ) ;
885+ let db2 = self . db . clone ( ) ;
886+ let workflow_id = self . workflow_id ;
887+ let name = self . name . clone ( ) ;
888+ let version = self . version ;
889+ let nested_loop_location = self . loop_location ( ) . cloned ( ) ;
890+
891+ // This future is deferred until later for parallelization
892+ let loop_event_commit_fut = async move {
893+ db2. upsert_workflow_loop_event (
894+ workflow_id,
895+ & name,
886896 & loop_location,
887- self . version ,
897+ version,
888898 0 ,
889899 & state_val,
890900 None ,
891- self . loop_location ( ) ,
901+ nested_loop_location . as_ref ( ) ,
892902 )
893- . await ?;
903+ . await
904+ } ;
894905
895- ( 0 , state, None )
906+ ( 0 , state, None , Some ( loop_event_commit_fut ) )
896907 } ;
897908
898- // Create a branch but no branch event ( loop event takes its place)
909+ // Create a branch for the loop event
899910 let mut loop_branch =
900911 self . branch_inner ( self . input . clone ( ) , self . version , loop_location. clone ( ) ) ;
901912
@@ -923,6 +934,7 @@ impl WorkflowCtx {
923934 . root ( )
924935 . join ( Coordinate :: simple ( iteration + 1 ) ) ,
925936 ) ;
937+ let iteration_branch_root = iteration_branch. cursor . root ( ) . clone ( ) ;
926938
927939 // Set branch loop location to the current loop
928940 iteration_branch. loop_location = Some ( loop_location. clone ( ) ) ;
@@ -931,41 +943,68 @@ impl WorkflowCtx {
931943
932944 // Async block for instrumentation purposes
933945 let ( dt2, res) = async {
934- // Insert event if iteration is not a replay
935- if !loop_branch. cursor . compare_loop_branch ( iteration) ? {
936- self . db
937- . commit_workflow_branch_event (
938- self . workflow_id ,
939- iteration_branch. cursor . root ( ) ,
940- self . version ,
941- Some ( & loop_location) ,
942- )
943- . await ?;
944- }
945-
946946 let start_instant2 = Instant :: now ( ) ;
947+ let db2 = self . db . clone ( ) ;
948+
949+ // NOTE: Great care has been taken to optimize this function. This join allows multiple
950+ // txns to run simultaneously instead of in series but is hard to read.
951+ //
952+ // 1. First, but not necessarily chronologically first because its parallelized, we
953+ // commit the loop event. This only happens on the first iteration of the loop
954+ // 2. Second, we commit the branch event for the current iteration
955+ // 3. Last, we run the user's loop code
956+ let ( loop_event_commit_res, branch_commit_res, loop_res) = tokio:: join!(
957+ async {
958+ if let Some ( loop_event_commit_fut) = loop_event_commit_fut. take( ) {
959+ loop_event_commit_fut. await
960+ } else {
961+ Ok ( ( ) )
962+ }
963+ } ,
964+ async {
965+ // Insert event if iteration is not a replay
966+ if !loop_branch. cursor. compare_loop_branch( iteration) ? {
967+ db2. commit_workflow_branch_event(
968+ self . workflow_id,
969+ & iteration_branch_root,
970+ self . version,
971+ Some ( & loop_location) ,
972+ )
973+ . await
974+ } else {
975+ Ok ( ( ) )
976+ }
977+ } ,
978+ cb( & mut iteration_branch, & mut state) ,
979+ ) ;
980+
981+ loop_event_commit_res?;
982+ branch_commit_res?;
947983
948984 // Run loop
949- match cb ( & mut iteration_branch , & mut state ) . await ? {
985+ match loop_res ? {
950986 Loop :: Continue => {
951987 let dt2 = start_instant2. elapsed ( ) . as_secs_f64 ( ) ;
952988 iteration += 1 ;
953989
954990 let state_val = serde_json:: value:: to_raw_value ( & state)
955991 . map_err ( WorkflowError :: SerializeLoopOutput ) ?;
956992
957- self . db
958- . upsert_workflow_loop_event (
959- self . workflow_id ,
960- & self . name ,
961- & loop_location,
962- self . version ,
963- iteration,
964- & state_val,
965- None ,
966- self . loop_location ( ) ,
967- )
968- . await ?;
993+ // Commit workflow state to db
994+ if iteration % LOOP_ITERS_PER_COMMIT == 0 {
995+ self . db
996+ . upsert_workflow_loop_event (
997+ self . workflow_id ,
998+ & self . name ,
999+ & loop_location,
1000+ self . version ,
1001+ iteration,
1002+ & state_val,
1003+ None ,
1004+ self . loop_location ( ) ,
1005+ )
1006+ . await ?;
1007+ }
9691008
9701009 anyhow:: Ok ( ( dt2, None ) )
9711010 }
@@ -978,6 +1017,7 @@ impl WorkflowCtx {
9781017 let output_val = serde_json:: value:: to_raw_value ( & res)
9791018 . map_err ( WorkflowError :: SerializeLoopOutput ) ?;
9801019
1020+ // Commit loop output and final state to db
9811021 self . db
9821022 . upsert_workflow_loop_event (
9831023 self . workflow_id ,
0 commit comments