Skip to content

Commit 5972ed8

Browse files
committed
fix(gas): fix loop forgotten bug due to concurrency
1 parent ff15327 commit 5972ed8

File tree

6 files changed

+44
-20
lines changed

6 files changed

+44
-20
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,3 +432,8 @@ debug = false
432432
lto = "fat"
433433
codegen-units = 1
434434
opt-level = 3
435+
436+
# strip = true
437+
# panic = "abort"
438+
# overflow-checks = false
439+
# debug-assertions = false

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2576,23 +2576,41 @@ impl Database for DatabaseKv {
25762576
keys::history::HistorySubspaceVariant::Forgotten,
25772577
));
25782578

2579-
let loop_events_subspace =
2580-
self.subspace
2581-
.subspace(&keys::history::EventHistorySubspaceKey::entire(
2582-
from_workflow_id,
2583-
location.clone(),
2584-
false,
2585-
));
2579+
// Start is {loop location, 0, ...}
2580+
let loop_events_subspace_start = self
2581+
.subspace
2582+
.subspace(&keys::history::EventHistorySubspaceKey::entire(
2583+
from_workflow_id,
2584+
location.clone(),
2585+
false,
2586+
))
2587+
.range()
2588+
.0;
2589+
// End is {loop location, iteration - 1, ...}
2590+
let loop_events_subspace_end = self
2591+
.subspace
2592+
.subspace(&keys::history::EventHistorySubspaceKey::new(
2593+
from_workflow_id,
2594+
location.clone(),
2595+
iteration.saturating_sub(1),
2596+
false,
2597+
))
2598+
.range()
2599+
.1;
25862600

25872601
let mut stream = tx.get_ranges_keyvalues(
25882602
universaldb::RangeOption {
25892603
mode: StreamingMode::WantAll,
2590-
..(&loop_events_subspace).into()
2604+
..(
2605+
loop_events_subspace_start.as_slice(),
2606+
loop_events_subspace_end.as_slice(),
2607+
)
2608+
.into()
25912609
},
25922610
Serializable,
25932611
);
25942612

2595-
// Move all current events under this loop to the forgotten history
2613+
// Move all events under this loop up to the current iteration to the forgotten history
25962614
loop {
25972615
let Some(entry) = stream.try_next().await? else {
25982616
break;
@@ -2602,15 +2620,15 @@ impl Database for DatabaseKv {
26022620
return Err(universaldb::tuple::PackError::BadPrefix.into());
26032621
}
26042622

2605-
// Truncate tuple up to ACTIVE and replace it with FORGOTTEN
2623+
// Truncate tuple up to ...ACTIVE and replace it with ...FORGOTTEN
26062624
let truncated_key = &entry.key()[active_history_subspace.bytes().len()..];
26072625
let forgotten_key =
26082626
[forgotten_history_subspace.bytes(), truncated_key].concat();
26092627

26102628
tx.set(&forgotten_key, entry.value());
26112629
}
26122630

2613-
tx.clear_subspace_range(&loop_events_subspace);
2631+
tx.clear_range(&loop_events_subspace_start, &loop_events_subspace_end);
26142632

26152633
// Only retain last 100 events in forgotten history
26162634
if iteration > 100 {

engine/packages/pegboard-runner/src/conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub async fn init_conn(
128128

129129
// Spawn a new runner workflow if one doesn't already exist
130130
let workflow_id = ctx
131-
.workflow(pegboard::workflows::runner::Input {
131+
.workflow(pegboard::workflows::runner2::Input {
132132
runner_id,
133133
namespace_id: namespace.namespace_id,
134134
name: name.clone(),

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
103103
Main::Forward(sig) => {
104104
match sig.inner {
105105
protocol::ToServer::ToServerInit(init_sig) => {
106-
if init.is_none() {
107-
init = Some(init_sig);
108-
check_queue = true;
109-
}
106+
init = Some(init_sig);
107+
check_queue = true;
110108
}
111109
protocol::ToServer::ToServerEvents(new_events) => {
112110
// Ignore events that were already received
@@ -234,8 +232,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
234232
let last_event_idx = events.last().map(|event| event.index);
235233

236234
// NOTE: This should not be parallelized because signals should be sent in order
237-
// Forward to actor workflows
238-
// Process events
235+
// Forward events to actor workflows
239236
for event in &events {
240237
let actor_id = crate::utils::event_actor_id(&event.inner).to_string();
241238
let res = ctx

scripts/docker/build-push.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ for tag in "${TAGS[@]}"; do
2323
BUILD_TAG_ARGS+=("-t" "${IMAGE_REPO}:${tag}")
2424
done
2525

26-
docker build -f "${DOCKERFILE}" --target "${TARGET}" --platform linux/x86_64 --build-arg BUILD_FRONTEND=true "${BUILD_TAG_ARGS[@]}" "${CONTEXT}"
26+
docker build -f "${DOCKERFILE}" --target "${TARGET}" \
27+
--platform linux/x86_64 \
28+
--build-arg BUILD_FRONTEND=true \
29+
# --build-arg CARGO_BUILD_MODE=release \
30+
"${BUILD_TAG_ARGS[@]}" \
31+
"${CONTEXT}"
2732

2833
echo "Pushing images..."
2934
for tag in "${TAGS[@]}"; do

scripts/run/docker/engine-rocksdb.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \
1515
RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \
1616
RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \
1717
cargo run --bin rivet-engine -- start "$@" 2>&1 | tee -i /tmp/rivet-engine.log
18-

0 commit comments

Comments
 (0)