Skip to content

Commit 4294775

Browse files
committed
fix(gas): fix loop forgotten bug due to concurrency
1 parent e68ccc5 commit 4294775

File tree

6 files changed

+44
-20
lines changed

6 files changed

+44
-20
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,3 +433,8 @@ debug = false
433433
lto = "fat"
434434
codegen-units = 1
435435
opt-level = 3
436+
437+
# strip = true
438+
# panic = "abort"
439+
# overflow-checks = false
440+
# debug-assertions = false

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2579,23 +2579,41 @@ impl Database for DatabaseKv {
25792579
keys::history::HistorySubspaceVariant::Forgotten,
25802580
));
25812581

2582-
let loop_events_subspace =
2583-
self.subspace
2584-
.subspace(&keys::history::EventHistorySubspaceKey::entire(
2585-
from_workflow_id,
2586-
location.clone(),
2587-
false,
2588-
));
2582+
// Start is {loop location, 0, ...}
2583+
let loop_events_subspace_start = self
2584+
.subspace
2585+
.subspace(&keys::history::EventHistorySubspaceKey::entire(
2586+
from_workflow_id,
2587+
location.clone(),
2588+
false,
2589+
))
2590+
.range()
2591+
.0;
2592+
// End is {loop location, iteration - 1, ...}
2593+
let loop_events_subspace_end = self
2594+
.subspace
2595+
.subspace(&keys::history::EventHistorySubspaceKey::new(
2596+
from_workflow_id,
2597+
location.clone(),
2598+
iteration.saturating_sub(1),
2599+
false,
2600+
))
2601+
.range()
2602+
.1;
25892603

25902604
let mut stream = tx.get_ranges_keyvalues(
25912605
universaldb::RangeOption {
25922606
mode: StreamingMode::WantAll,
2593-
..(&loop_events_subspace).into()
2607+
..(
2608+
loop_events_subspace_start.as_slice(),
2609+
loop_events_subspace_end.as_slice(),
2610+
)
2611+
.into()
25942612
},
25952613
Serializable,
25962614
);
25972615

2598-
// Move all current events under this loop to the forgotten history
2616+
// Move all events under this loop up to the current iteration to the forgotten history
25992617
loop {
26002618
let Some(entry) = stream.try_next().await? else {
26012619
break;
@@ -2605,15 +2623,15 @@ impl Database for DatabaseKv {
26052623
return Err(universaldb::tuple::PackError::BadPrefix.into());
26062624
}
26072625

2608-
// Truncate tuple up to ACTIVE and replace it with FORGOTTEN
2626+
// Truncate tuple up to ...ACTIVE and replace it with ...FORGOTTEN
26092627
let truncated_key = &entry.key()[active_history_subspace.bytes().len()..];
26102628
let forgotten_key =
26112629
[forgotten_history_subspace.bytes(), truncated_key].concat();
26122630

26132631
tx.set(&forgotten_key, entry.value());
26142632
}
26152633

2616-
tx.clear_subspace_range(&loop_events_subspace);
2634+
tx.clear_range(&loop_events_subspace_start, &loop_events_subspace_end);
26172635

26182636
// Only retain last 100 events in forgotten history
26192637
if iteration > 100 {

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub async fn init_conn(
128128

129129
// Spawn a new runner workflow if one doesn't already exist
130130
let workflow_id = ctx
131-
.workflow(pegboard::workflows::runner::Input {
131+
.workflow(pegboard::workflows::runner2::Input {
132132
runner_id,
133133
namespace_id: namespace.namespace_id,
134134
name: name.clone(),

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
103103
Main::Forward(sig) => {
104104
match sig.inner {
105105
protocol::ToServer::ToServerInit(init_sig) => {
106-
if init.is_none() {
107-
init = Some(init_sig);
108-
check_queue = true;
109-
}
106+
init = Some(init_sig);
107+
check_queue = true;
110108
}
111109
protocol::ToServer::ToServerEvents(new_events) => {
112110
// Ignore events that were already received
@@ -234,8 +232,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
234232
let last_event_idx = events.last().map(|event| event.index);
235233

236234
// NOTE: This should not be parallelized because signals should be sent in order
237-
// Forward to actor workflows
238-
// Process events
235+
// Forward events to actor workflows
239236
for event in &events {
240237
let actor_id = crate::utils::event_actor_id(&event.inner).to_string();
241238
let res = ctx

scripts/docker/build-push.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ for tag in "${TAGS[@]}"; do
2323
BUILD_TAG_ARGS+=("-t" "${IMAGE_REPO}:${tag}")
2424
done
2525

26-
docker build -f "${DOCKERFILE}" --target "${TARGET}" --platform linux/x86_64 --build-arg BUILD_FRONTEND=true "${BUILD_TAG_ARGS[@]}" "${CONTEXT}"
26+
docker build -f "${DOCKERFILE}" --target "${TARGET}" \
27+
--platform linux/x86_64 \
28+
--build-arg BUILD_FRONTEND=true \
29+
# --build-arg CARGO_BUILD_MODE=release \
30+
"${BUILD_TAG_ARGS[@]}" \
31+
"${CONTEXT}"
2732

2833
echo "Pushing images..."
2934
for tag in "${TAGS[@]}"; do

scripts/run/docker/engine-rocksdb.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \
1313
RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \
1414
RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \
1515
cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log
16-

0 commit comments

Comments
 (0)