Skip to content

Commit 88e1336

Browse files
committed
fix(pb): implement events and commands for actor wfs
1 parent 7ddbf54 commit 88e1336

File tree

30 files changed

+1474
-493
lines changed

30 files changed

+1474
-493
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-builder/src/middleware.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,7 @@ pub async fn http_logging_middleware(
186186
);
187187

188188
// Update metrics
189-
metrics::API_REQUEST_PENDING.add(
190-
-1,
191-
&[
192-
KeyValue::new("method", method_clone.to_string()),
193-
KeyValue::new("path", path_clone.clone()),
194-
],
195-
);
189+
metrics::API_REQUEST_PENDING.add(-1, &[KeyValue::new("method", method_clone.to_string()), KeyValue::new("path", path_clone.clone())]);
196190

197191
let error_code: String = if status.is_success() {
198192
String::new()

engine/packages/engine/src/commands/db/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ pub enum SubCommand {
2222
pub enum DatabaseType {
2323
#[clap(alias = "ch")]
2424
Clickhouse,
25-
#[clap(alias = "wfd")]
26-
WorkflowData,
27-
#[clap(alias = "wfi")]
28-
WorkflowInternal,
2925
}
3026

3127
impl SubCommand {
@@ -48,12 +44,6 @@ impl SubCommand {
4844
DatabaseType::Clickhouse => {
4945
crate::util::db::clickhouse_shell(config, shell_ctx).await?
5046
}
51-
DatabaseType::WorkflowData => {
52-
crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await?
53-
}
54-
DatabaseType::WorkflowInternal => {
55-
crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await?
56-
}
5747
}
5848

5949
Ok(())

engine/packages/engine/src/commands/start.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,23 @@ async fn verify_engine_version(
115115
pools
116116
.udb()?
117117
.run(|tx| async move {
118-
let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION"))
119-
.context("failed to parse cargo pkg version as semver")?;
118+
let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).context("failed to parse cargo pkg version as semver")?;
120119

121-
if let Some(existing_version) =
122-
tx.read_opt(&keys::EngineVersionKey {}, Serializable).await?
123-
{
120+
if let Some(existing_version) = tx.read_opt(&keys::EngineVersionKey {}, Serializable).await? {
124121
if current_version < existing_version {
125-
return Ok(Err(anyhow!("{}", formatdoc!(
126-
"
122+
return Ok(Err(anyhow!(
123+
"{}",
124+
formatdoc!(
125+
"
127126
Rivet Engine has been rolled back to a previous version:
128127
- Last Used Version: {existing_version}
129128
- Current Version: {current_version}
130129
Cannot proceed without potential data corruption.
131130
132131
(If you know what you're doing, this error can be disabled in the Rivet config via `allow_version_rollback: true`)
133132
"
134-
))));
133+
)
134+
)));
135135
}
136136
}
137137

engine/packages/engine/src/util/db.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use std::{path::Path, result::Result::Ok, str::FromStr};
1+
use std::{path::Path, result::Result::Ok};
22

33
use anyhow::*;
4-
use rivet_util::Id;
54
use serde_json::json;
65

76
pub struct ShellQuery {
@@ -74,26 +73,3 @@ pub async fn clickhouse_shell(
7473

7574
Ok(())
7675
}
77-
78-
pub async fn wf_sqlite_shell(
79-
config: rivet_config::Config,
80-
shell_ctx: ShellContext<'_>,
81-
_internal: bool,
82-
) -> Result<()> {
83-
let ShellContext { queries, .. } = shell_ctx;
84-
85-
let _pools = rivet_pools::Pools::new(config.clone()).await?;
86-
87-
// Combine all queries into one command
88-
for ShellQuery {
89-
svc: workflow_id,
90-
query: _query,
91-
} in queries
92-
{
93-
let _workflow_id = Id::from_str(workflow_id).context("could not parse input as Id")?;
94-
95-
todo!();
96-
}
97-
98-
Ok(())
99-
}

engine/packages/guard-core/src/websocket_handle.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::*;
22
use futures_util::{SinkExt, StreamExt, stream::Peekable};
33
use hyper::upgrade::Upgraded;
44
use hyper_tungstenite::HyperWebsocket;
5-
use hyper_tungstenite::tungstenite::Message as WsMessage;
5+
use hyper_tungstenite::tungstenite::Message;
66
use hyper_util::rt::TokioIo;
77
use std::sync::Arc;
88
use tokio::sync::Mutex;
@@ -12,7 +12,7 @@ pub type WebSocketReceiver =
1212
Peekable<futures_util::stream::SplitStream<WebSocketStream<TokioIo<Upgraded>>>>;
1313

1414
pub type WebSocketSender =
15-
futures_util::stream::SplitSink<WebSocketStream<TokioIo<Upgraded>>, WsMessage>;
15+
futures_util::stream::SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>;
1616

1717
#[derive(Clone)]
1818
pub struct WebSocketHandle {
@@ -31,7 +31,7 @@ impl WebSocketHandle {
3131
})
3232
}
3333

34-
pub async fn send(&self, message: WsMessage) -> Result<()> {
34+
pub async fn send(&self, message: Message) -> Result<()> {
3535
self.ws_tx.lock().await.send(message).await?;
3636
Ok(())
3737
}

engine/packages/guard-core/tests/simple_websocket.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -175,25 +175,25 @@ async fn start_websocket_server() -> SocketAddr {
175175
match message_result {
176176
Ok(msg) => {
177177
match &msg {
178-
hyper_tungstenite::tungstenite::Message::Text(text) => {
179-
println!("Server: Received text message: {}", text);
180-
},
181-
hyper_tungstenite::tungstenite::Message::Binary(data) => {
182-
println!("Server: Received binary message of {} bytes", data.len());
183-
},
184-
hyper_tungstenite::tungstenite::Message::Ping(_) => {
185-
println!("Server: Received ping");
186-
},
187-
hyper_tungstenite::tungstenite::Message::Pong(_) => {
188-
println!("Server: Received pong");
189-
},
190-
hyper_tungstenite::tungstenite::Message::Close(_) => {
191-
println!("Server: Received close message");
192-
},
193-
_ => {
194-
println!("Server: Received unknown message type");
195-
}
196-
}
178+
hyper_tungstenite::tungstenite::Message::Text(text) => {
179+
println!("Server: Received text message: {}", text);
180+
}
181+
hyper_tungstenite::tungstenite::Message::Binary(data) => {
182+
println!("Server: Received binary message of {} bytes", data.len());
183+
}
184+
hyper_tungstenite::tungstenite::Message::Ping(_) => {
185+
println!("Server: Received ping");
186+
}
187+
hyper_tungstenite::tungstenite::Message::Pong(_) => {
188+
println!("Server: Received pong");
189+
}
190+
hyper_tungstenite::tungstenite::Message::Close(_) => {
191+
println!("Server: Received close message");
192+
}
193+
_ => {
194+
println!("Server: Received unknown message type");
195+
}
196+
}
197197

198198
println!("Server: Echoing message back");
199199
match write.send(msg).await {

engine/packages/guard/src/routing/pegboard_gateway.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ async fn route_request_inner(
210210
.to_workflow_id(actor.workflow_id)
211211
.graceful_not_found()
212212
.send()
213-
.await;
213+
.await?;
214214

215215
if res.is_none() {
216216
tracing::warn!(

engine/packages/metrics/src/buckets.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
pub const BUCKETS: &[f64] = &[
22
// For otel
33
0.0, // Added
4-
0.001, 0.0025,
5-
// Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27
4+
0.001,
5+
0.0025, // Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27
66
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, // Added
77
25.0, 50.0, 100.0, 250.0, 500.0,
88
];

engine/packages/pegboard-gateway/src/keepalive_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub async fn task(
5252
ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input {
5353
actor_id,
5454
gateway_id,
55-
request_id,
55+
request_id
5656
}),
5757
// Keep alive in flight req during hibernation
5858
shared_state.keepalive_hws(request_id),

0 commit comments

Comments
 (0)