Skip to content

Commit 6af6fec

Browse files
authored
fix: dont purge cache for actor keys (#3204)
* fix(api-public): do not pass user headers to forward to dc * chore(guard): log runner routing * fix: dont purge cache for actor keys
1 parent 6d7271b commit 6af6fec

File tree

9 files changed

+49
-14
lines changed

9 files changed

+49
-14
lines changed

packages/common/api-util/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{Context, Result};
22
use axum::{body::Body, response::Response};
33
use futures_util::StreamExt;
44
use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse};
5-
use serde::{de::DeserializeOwned, Serialize};
5+
use serde::{Serialize, de::DeserializeOwned};
66
use std::future::Future;
77

88
mod errors;

packages/common/runtime/src/traces.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ pub fn init_tracing_subscriber(otel_providers: &Option<OtelProviderGuard>) {
3232
let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone())
3333
.with_filter(build_filter_from_env_var("RUST_TRACE"));
3434

35-
(
36-
Some(otel_trace_layer),
37-
Some(otel_metric_layer),
38-
)
35+
(Some(otel_trace_layer), Some(otel_metric_layer))
3936
}
4037
None => (None, None),
4138
};

packages/core/api-public/src/health.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::Result;
2-
use axum::{extract::Extension, response::IntoResponse, Json};
2+
use axum::{Json, extract::Extension, response::IntoResponse};
33
use futures_util::StreamExt;
44
use rivet_api_builder::ApiError;
55
use serde::{Deserialize, Serialize};

packages/core/api-public/src/router.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ use axum::{
33
middleware::{self, Next},
44
response::{IntoResponse, Redirect, Response},
55
};
6-
use reqwest::header::{HeaderMap, AUTHORIZATION};
6+
use reqwest::header::{AUTHORIZATION, HeaderMap};
77
use rivet_api_builder::{create_router, extract::FailedExtraction};
88
use tower_http::cors::CorsLayer;
99
use utoipa::OpenApi;
1010

11-
use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_configs, runners, ui};
11+
use crate::{
12+
actors, ctx, datacenters, health, metadata, namespaces, runner_configs, runners, ui,
13+
};
1214

1315
#[derive(OpenApi)]
1416
#[openapi(

packages/services/epoxy/src/ops/explicit_prepare.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,15 @@ pub async fn epoxy_explicit_prepare(
7979
let result = match analyze_prepare_responses(&highest_ballot_responses, instance) {
8080
PrepareDecision::Commit(payload) => {
8181
// EPaxos Step 29: Run Commit phase
82-
let result =
83-
crate::ops::propose::commit(ctx, &config, replica_id, &quorum_members, payload)
84-
.await?;
82+
let result = crate::ops::propose::commit(
83+
ctx,
84+
&config,
85+
replica_id,
86+
&quorum_members,
87+
payload,
88+
false,
89+
)
90+
.await?;
8591
convert_proposal_result(result)
8692
}
8793
PrepareDecision::Accept(payload) => {
@@ -92,6 +98,7 @@ pub async fn epoxy_explicit_prepare(
9298
replica_id,
9399
&quorum_members,
94100
payload,
101+
false,
95102
)
96103
.await?;
97104
convert_proposal_result(result)
@@ -324,7 +331,12 @@ async fn restart_phase1(
324331
);
325332

326333
// Call the propose operation to restart consensus from Phase 1
327-
let result = ctx.op(crate::ops::propose::Input { proposal }).await?;
334+
let result = ctx
335+
.op(crate::ops::propose::Input {
336+
proposal,
337+
purge_cache: false,
338+
})
339+
.await?;
328340

329341
// Convert ProposalResult to ExplicitPrepareResult
330342
match result {

packages/services/epoxy/src/ops/propose.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ pub enum CommandError {
2424
#[derive(Debug)]
2525
pub struct Input {
2626
pub proposal: protocol::Proposal,
27+
/// Only works in non-workflow contexts.
28+
pub purge_cache: bool,
2729
}
2830

2931
#[operation]
@@ -67,10 +69,26 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
6769

6870
match path {
6971
Path::PathFast(protocol::PathFast { payload }) => {
70-
commit(ctx, &config, replica_id, &quorum_members, payload).await
72+
commit(
73+
ctx,
74+
&config,
75+
replica_id,
76+
&quorum_members,
77+
payload,
78+
input.purge_cache,
79+
)
80+
.await
7181
}
7282
Path::PathSlow(protocol::PathSlow { payload }) => {
73-
run_paxos_accept(ctx, &config, replica_id, &quorum_members, payload).await
83+
run_paxos_accept(
84+
ctx,
85+
&config,
86+
replica_id,
87+
&quorum_members,
88+
payload,
89+
input.purge_cache,
90+
)
91+
.await
7492
}
7593
}
7694
}
@@ -82,6 +100,7 @@ pub async fn run_paxos_accept(
82100
replica_id: ReplicaId,
83101
quorum_members: &[ReplicaId],
84102
payload: Payload,
103+
purge_cache: bool,
85104
) -> Result<ProposalResult> {
86105
// Clone payload for use after the closure
87106
let payload_for_accepts = payload.clone();
@@ -113,6 +132,7 @@ pub async fn run_paxos_accept(
113132
replica_id,
114133
&quorum_members,
115134
payload_for_accepts,
135+
purge_cache,
116136
)
117137
.await
118138
} else {
@@ -127,6 +147,7 @@ pub async fn commit(
127147
replica_id: ReplicaId,
128148
quorum_members: &[ReplicaId],
129149
payload: Payload,
150+
purge_cache: bool,
130151
) -> Result<ProposalResult> {
131152
// Commit locally
132153
//

packages/services/epoxy/tests/common/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub async fn execute_command(
1414
proposal: protocol::Proposal {
1515
commands: vec![protocol::Command { kind: command }],
1616
},
17+
purge_cache: false,
1718
})
1819
.await?;
1920

packages/services/epoxy/tests/proposal.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ async fn proposal() {
2828
}),
2929
}],
3030
},
31+
purge_cache: false,
3132
})
3233
.await
3334
.unwrap();

packages/services/pegboard/src/workflows/actor/actor_keys.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ pub async fn propose(ctx: &ActivityCtx, input: &ProposeInput) -> Result<Proposal
203203
}),
204204
}],
205205
},
206+
purge_cache: false,
206207
})
207208
.await?;
208209

0 commit comments

Comments
 (0)