From 8187d00b3c0ff03a33856464acfe1550b91970cf Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Mon, 9 Feb 2026 14:52:48 -0600 Subject: [PATCH 1/5] perf(server): make process cycle-detection asynchronous - move cycle detection to the watchdog - (fix) allow update_parent_depth to handle cycles in the process graph Resolves #770 --- packages/server/src/database/postgres.sql | 16 +- packages/server/src/database/sqlite.sql | 3 + packages/server/src/process/spawn.rs | 89 +++------- packages/server/src/watchdog.rs | 205 +++++++++++++++++++++- 4 files changed, 235 insertions(+), 78 deletions(-) diff --git a/packages/server/src/database/postgres.sql b/packages/server/src/database/postgres.sql index b8160957d..7b402dc31 100644 --- a/packages/server/src/database/postgres.sql +++ b/packages/server/src/database/postgres.sql @@ -46,16 +46,18 @@ as $$ declare current_ids text[]; updated_ids text[]; + visited_ids text[] := '{}'; begin current_ids := changed_process_ids; while array_length(current_ids, 1) is not null and array_length(current_ids, 1) > 0 loop - -- Update parents based on their children's depths + -- Update parents based on their children's depths, excluding already visited parents to handle cycles. with child_depths as ( select process_children.process, max(processes.depth) as max_child_depth from process_children join processes on processes.id = process_children.child where process_children.child = any(current_ids) + and not process_children.process = any(visited_ids) group by process_children.process ), updated as ( @@ -69,10 +71,13 @@ begin select coalesce(array_agg(id), '{}') into updated_ids from updated; - -- Exit if no parents were updated + -- Mark the current batch as visited. + visited_ids := visited_ids || current_ids; + + -- Exit if no parents were updated. exit when cardinality(updated_ids) = 0; - -- Continue with the updated parents + -- Continue with the updated parents. current_ids := updated_ids; end loop; end; @@ -90,8 +95,9 @@ create index process_tokens_token_index on process_tokens (token); create table process_children ( process text not null, child text not null, - position int8 not null, + cycle int8, options text, + position int8 not null, token text ); @@ -101,6 +107,8 @@ create index process_children_index on process_children (process, position); create index process_children_child_process_index on process_children (child, process); +create index process_children_cycle_index on process_children (cycle) where cycle is null; + create table remotes ( name text primary key, url text not null diff --git a/packages/server/src/database/sqlite.sql b/packages/server/src/database/sqlite.sql index e0d44dd22..294faa15d 100644 --- a/packages/server/src/database/sqlite.sql +++ b/packages/server/src/database/sqlite.sql @@ -50,6 +50,7 @@ create index process_tokens_token_index on process_tokens (token); create table process_children ( process text not null, child text not null, + cycle integer, options text, position integer not null, token text @@ -61,6 +62,8 @@ create unique index process_children_process_position_index on process_children create index process_children_child_index on process_children (child); +create index process_children_cycle_index on process_children (cycle) where cycle is null; + create table remotes ( name text primary key, url text not null diff --git a/packages/server/src/process/spawn.rs b/packages/server/src/process/spawn.rs index 88bd749fd..3ee88d7da 100644 --- a/packages/server/src/process/spawn.rs +++ b/packages/server/src/process/spawn.rs @@ -5,7 +5,7 @@ use { stream::{self, BoxStream, FuturesUnordered}, }, indoc::{formatdoc, indoc}, - std::{fmt::Write, pin::pin}, + std::pin::pin, tangram_client::prelude::*, tangram_database::{self as db, prelude::*}, tangram_futures::{stream::Ext as _, task::Task}, @@ -973,72 +973,6 @@ impl Server { ) -> tg::Result<()> { let p = transaction.p(); - // Determine if adding this child process creates a cycle. - let statement = formatdoc!( - " - with recursive ancestors as ( - select {p}1 as id - union all - select process_children.process as id - from ancestors - join process_children on ancestors.id = process_children.child - ) - select exists( - select 1 from ancestors where id = {p}2 - ); - " - ); - let params = db::params![parent.to_string(), child.to_string()]; - let cycle = transaction - .query_one_value_into::(statement.into(), params) - .await - .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; - - // If adding this child creates a cycle, return an error. - if cycle { - // Try to reconstruct the cycle path by walking from the child through its - // descendants until we find a path back to the parent. - let statement = formatdoc!( - " - with recursive reachable (current_process, path) as ( - select {p}2, {p}2 - - union - - select pc.child, r.path || ' ' || pc.child - from reachable r - join process_children pc on r.current_process = pc.process - where r.path not like '%' || pc.child || '%' - ) - select - {p}1 || ' ' || path as cycle - from reachable - where current_process = {p}1 - limit 1; - " - ); - let params = db::params![parent.to_string(), child.to_string()]; - let cycle = transaction - .query_one_value_into::(statement.into(), params) - .await - .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) - .ok(); - let mut message = String::from("adding this child process creates a cycle"); - if let Some(cycle) = cycle { - let processes = cycle.split(' ').collect::>(); - for i in 0..processes.len() - 1 { - let parent = processes[i]; - let child = processes[i + 1]; - if i == 0 { - write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); - } else { - write!(&mut message, "\n{parent} has child {child}").unwrap(); - } - } - } - return Err(tg::error!("{message}")); - } - // Add the child to the database. let statement = formatdoc!( " @@ -1090,6 +1024,7 @@ impl Server { child_ids: Vec, ) -> tg::Result<()> { let mut current_ids = child_ids; + let mut visited = std::collections::HashSet::new(); while !current_ids.is_empty() { let mut updated_ids = Vec::new(); @@ -1119,6 +1054,10 @@ impl Server { // Update each parent's depth if needed. for parent in parents { + // Skip parents that have already been visited to avoid infinite loops from cycles. + if visited.contains(&parent.process) { + continue; + } if let Some(max_child_depth) = parent.max_child_depth { let statement = indoc!( " @@ -1144,6 +1083,9 @@ impl Server { } } + // Mark the current batch as visited. + visited.extend(current_ids); + // Exit if no parents were updated. if updated_ids.is_empty() { break; @@ -1169,6 +1111,19 @@ impl Server { .ok(); } }); + tokio::spawn({ + let server = self.clone(); + async move { + server + .messenger + .publish("watchdog".into(), ()) + .await + .inspect_err(|error| { + tracing::error!(?error, "failed to publish the watchdog message"); + }) + .ok(); + } + }); } fn spawn_process_parent_permit_task( diff --git a/packages/server/src/watchdog.rs b/packages/server/src/watchdog.rs index 6781fb50e..92fc71e8f 100644 --- a/packages/server/src/watchdog.rs +++ b/packages/server/src/watchdog.rs @@ -1,9 +1,12 @@ use { crate::Server, - futures::{FutureExt as _, StreamExt as _, stream::FuturesUnordered}, + futures::{ + FutureExt as _, StreamExt as _, TryFutureExt, TryStreamExt, future, + stream::FuturesUnordered, + }, indoc::formatdoc, num::ToPrimitive as _, - std::{collections::BTreeMap, pin::pin}, + std::{collections::BTreeMap, fmt::Write, pin::pin}, tangram_client::prelude::*, tangram_database::{self as db, prelude::*}, tangram_messenger::prelude::*, @@ -13,12 +16,20 @@ impl Server { pub async fn watchdog_task(&self, config: &crate::config::Watchdog) -> tg::Result<()> { loop { // Finish processes. - let result = self.watchdog_task_inner(config).await.inspect_err( - |error| tracing::error!(error = %error.trace(), "failed to finish processes"), - ); + let expired_future = self + .finish_expired_processes(config) + .inspect_err( + |error| tracing::error!(error = %error.trace(), "failed to finish processes"), + ); + let cycle_future = self + .finish_cyclic_processes(config) + .inspect_err( + |error| tracing::error!(error = %error.trace(), "failed to finish processes"), + ); + let result = future::join(expired_future, cycle_future).await; // If an error occurred or no processes were finished, wait to be signaled or for the timeout to expire. - if matches!(result, Err(_) | Ok(0)) { + if matches!(result, (Err(_), Err(_)) | (Ok(0), Ok(0))) { let stream = self .messenger .subscribe::<()>("watchdog".into(), None) @@ -37,7 +48,7 @@ impl Server { } } - pub(crate) async fn watchdog_task_inner( + pub(crate) async fn finish_expired_processes( &self, config: &crate::config::Watchdog, ) -> tg::Result { @@ -130,4 +141,184 @@ impl Server { Ok(n) } + + async fn finish_cyclic_processes(&self, config: &crate::config::Watchdog) -> tg::Result { + // Get a database connection. + let connection = self + .database + .connection() + .await + .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; + let p = connection.p(); + + // Get processes to finish. + #[derive(Debug, db::row::Deserialize)] + struct Row { + #[tangram_database(as = "db::value::FromStr")] + process: tg::process::Id, + + #[tangram_database(as = "db::value::FromStr")] + child: tg::process::Id, + } + let statement = formatdoc!( + " + select process, child + from process_children + where cycle is null + limit {p}1; + " + ); + let params = db::params![config.batch_size]; + let rows = connection + .query_all_into::(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + drop(connection); + let n = rows.len().to_u64().unwrap(); + rows.into_iter() + .map(|row| { + let server = self.clone(); + async move { + let Some(error) = server.try_detect_cycle(&row.process, &row.child).await? + else { + return Ok(()); + }; + let arg = tg::process::finish::Arg { + checksum: None, + error: Some(tg::Either::Left(error)), + exit: 1, + local: None, + output: None, + remotes: None, + }; + server + .finish_process(&row.child, arg) + .await + .inspect_err(|error| { + tracing::error!(?error, "failed to cancel the process"); + }) + .ok(); + Ok::<_, tg::Error>(()) + } + }) + .collect::>() + .try_collect::<()>() + .await + .map_err(|source| tg::error!(!source, "cycle detection failed"))?; + Ok(n) + } + + async fn try_detect_cycle( + &self, + parent: &tg::process::Id, + child: &tg::process::Id, + ) -> tg::Result> { + let connection = self + .database + .connection() + .await + .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; + let p = connection.p(); + + // Determine if adding this child process creates a cycle. + let statement = formatdoc!( + " + with recursive ancestors as ( + select {p}1 as id + union all + select process_children.process as id + from ancestors + join process_children on ancestors.id = process_children.child + ) + select exists( + select 1 from ancestors where id = {p}2 + ); + " + ); + let params = db::params![parent.to_string(), child.to_string()]; + let cycle = connection + .query_one_value_into::(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; + + // Upgrade to a write connection. + drop(connection); + let connection = self + .database + .write_connection() + .await + .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; + let p = connection.p(); + + // Mark the edge as forming a cycle. + let statement = formatdoc!( + " + update process_children set cycle = {p}3 where process = {p}1 and child = {p}2; + " + ); + let params = db::params![parent.to_string(), child.to_string(), cycle]; + connection + .execute(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to perform the query"))?; + + if !cycle { + return Ok(None); + } + + // Construct a good error message by collecting the entire cycle. + let mut message = String::from("adding this child process creates a cycle"); + + // Try to reconstruct the cycle path by walking from the child through its + // descendants until we find a path back to the parent. + let statement = formatdoc!( + " + with recursive reachable (current_process, path) as ( + select {p}2, {p}2 + + union + + select pc.child, r.path || ' ' || pc.child + from reachable r + join process_children pc on r.current_process = pc.process + where r.path not like '%' || pc.child || '%' + ) + select + {p}1 || ' ' || path as cycle + from reachable + where current_process = {p}1 + limit 1; + " + ); + let params = db::params![parent.to_string(), child.to_string()]; + let cycle = connection + .query_one_value_into::(statement.into(), params) + .await + .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) + .ok(); + + // Format the error message. + if let Some(cycle) = cycle { + let processes = cycle.split(' ').collect::>(); + for i in 0..processes.len() - 1 { + let parent = processes[i]; + let child = processes[i + 1]; + if i == 0 { + write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); + } else { + write!(&mut message, "\n{parent} has child {child}").unwrap(); + } + } + } + let error = tg::error::Data { + code: None, + diagnostics: None, + location: None, + message: Some(message), + source: None, + stack: None, + values: BTreeMap::new(), + }; + Ok(Some(error)) + } } From 0560454138fc61ac63e1f8bb47aff1cc56c5faf4 Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Tue, 10 Feb 2026 10:12:15 -0600 Subject: [PATCH 2/5] fix(messenger): support delivery to multiple consumers --- packages/messenger/src/memory.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/packages/messenger/src/memory.rs b/packages/messenger/src/memory.rs index 5d9538f28..9356f73c9 100644 --- a/packages/messenger/src/memory.rs +++ b/packages/messenger/src/memory.rs @@ -45,10 +45,15 @@ struct StreamState { closed: bool, config: StreamConfig, consumers: HashMap, - messages: BTreeMap>, + messages: BTreeMap, sequence: u64, } +struct MessageState { + payload: Arc, + acks_remaining: usize, +} + #[derive(Debug)] struct ConsumerState { #[expect(dead_code)] @@ -324,7 +329,12 @@ impl Stream { state.sequence += 1; let sequence = state.sequence; let payload: Arc = Arc::new(payload); - state.messages.insert(sequence, payload); + let acks_remaining = state.consumers.len(); + let message_state = MessageState { + payload, + acks_remaining, + }; + state.messages.insert(sequence, message_state); sequences.push(sequence); } @@ -452,7 +462,7 @@ impl Consumer { .iter() .skip_while(|(sequence, _)| **sequence <= consumer_sequence) .take(max_messages) - .map(|(sequence, payload)| (*sequence, payload.clone())) + .map(|(sequence, state)| (*sequence, Arc::clone(&state.payload))) .collect::>(); // Set the consumer's sequence number. @@ -506,7 +516,12 @@ impl Consumer { .upgrade() .ok_or_else(|| Error::other("the stream was destroyed"))?; let mut state = stream.state.write().unwrap(); - state.messages.remove(&sequence); + if let Some(message_state) = state.messages.get_mut(&sequence) { + message_state.acks_remaining = message_state.acks_remaining.saturating_sub(1); + if message_state.acks_remaining == 0 { + state.messages.remove(&sequence); + } + } Ok(()) } } From 12b20b578ee54d5c30e89769d50c519e479aa538 Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Tue, 10 Feb 2026 10:17:34 -0600 Subject: [PATCH 3/5] use additional consumer and stored procedure in postgres --- packages/server/src/database/postgres.sql | 58 ++- packages/server/src/database/sqlite.sql | 3 - packages/server/src/lib.rs | 14 +- packages/server/src/process/queue.rs | 11 +- packages/server/src/process/spawn.rs | 16 +- packages/server/src/watchdog.rs | 456 ++++++++++++++-------- 6 files changed, 365 insertions(+), 193 deletions(-) diff --git a/packages/server/src/database/postgres.sql b/packages/server/src/database/postgres.sql index 7b402dc31..6398117d9 100644 --- a/packages/server/src/database/postgres.sql +++ b/packages/server/src/database/postgres.sql @@ -83,6 +83,61 @@ begin end; $$; +create or replace function get_cyclic_processes( + parent_ids text[], + child_ids text[] +) +returns table(process_id text, cycle_path text) +language plpgsql +as $$ +declare + i int; + p_id text; + c_id text; + is_cycle boolean; + found_path text; +begin + for i in 1..coalesce(array_length(parent_ids, 1), 0) loop + p_id := parent_ids[i]; + c_id := child_ids[i]; + + -- Check if adding c_id as a child of p_id creates a cycle by walking ancestors of p_id. + select exists( + with recursive ancestors as ( + select p_id as ancestor_id + union + select pc.process + from ancestors a + join process_children pc on a.ancestor_id = pc.child + ) + select 1 from ancestors where ancestor_id = c_id + ) into is_cycle; + + if not is_cycle then + continue; + end if; + + -- Reconstruct the cycle path by walking descendants of c_id to find p_id. + select p_id || ' ' || r.path into found_path + from ( + with recursive reachable(current_process, path) as ( + select c_id, c_id::text + union + select pc.child, r.path || ' ' || pc.child + from reachable r + join process_children pc on r.current_process = pc.process + where r.path not like '%' || pc.child || '%' + ) + select current_process, path from reachable where current_process = p_id limit 1 + ) r; + + process_id := c_id; + cycle_path := found_path; + return next; + end loop; +end; +$$; + create table process_tokens ( process text not null, token text not null @@ -95,7 +150,6 @@ create index process_tokens_token_index on process_tokens (token); create table process_children ( process text not null, child text not null, - cycle int8, options text, position int8 not null, token text @@ -107,8 +161,6 @@ create index process_children_index on process_children (process, position); create index process_children_child_process_index on process_children (child, process); -create index process_children_cycle_index on process_children (cycle) where cycle is null; - create table remotes ( name text primary key, url text not null diff --git a/packages/server/src/database/sqlite.sql b/packages/server/src/database/sqlite.sql index 294faa15d..e0d44dd22 100644 --- a/packages/server/src/database/sqlite.sql +++ b/packages/server/src/database/sqlite.sql @@ -50,7 +50,6 @@ create index process_tokens_token_index on process_tokens (token); create table process_children ( process text not null, child text not null, - cycle integer, options text, position integer not null, token text @@ -62,8 +61,6 @@ create unique index process_children_process_position_index on process_children create index process_children_child_index on process_children (child); -create index process_children_cycle_index on process_children (cycle) where cycle is null; - create table remotes ( name text primary key, url text not null diff --git a/packages/server/src/lib.rs b/packages/server/src/lib.rs index d651fc121..05d6cfabf 100644 --- a/packages/server/src/lib.rs +++ b/packages/server/src/lib.rs @@ -484,7 +484,11 @@ impl Server { deliver: tangram_messenger::DeliverPolicy::All, }; stream - .create_consumer("queue".to_owned(), consumer_config) + .create_consumer("queue".to_owned(), consumer_config.clone()) + .await + .map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?; + stream + .create_consumer("cycle_detector".to_owned(), consumer_config) .await .map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?; } @@ -797,13 +801,7 @@ impl Server { let server = server.clone(); let config = config.clone(); async move { - server - .watchdog_task(&config) - .await - .inspect_err( - |error| tracing::error!(error = %error.trace(), "the watchdog task failed"), - ) - .ok(); + server.watchdog_task(&config).await; } }) }); diff --git a/packages/server/src/process/queue.rs b/packages/server/src/process/queue.rs index d91582c36..b435f73a5 100644 --- a/packages/server/src/process/queue.rs +++ b/packages/server/src/process/queue.rs @@ -21,7 +21,10 @@ use { )] pub struct Message { #[tangram_serialize(id = 0)] - pub id: tg::process::Id, + pub process: tg::process::Id, + + #[tangram_serialize(id = 1)] + pub parent: Option, } impl Server { @@ -78,7 +81,7 @@ impl Server { " ); let now = time::OffsetDateTime::now_utc().unix_timestamp(); - let params = db::params![payload.id.to_string(), now]; + let params = db::params![payload.process.to_string(), now]; let result = connection .execute(statement.into(), params) .await @@ -96,12 +99,12 @@ impl Server { } // Publish a message that the status changed. - let subject = format!("processes.{}.status", payload.id); + let subject = format!("processes.{}.status", payload.process); self.messenger.publish(subject, ()).await.ok(); // Return the dequeued process. let output = tg::process::queue::Output { - process: payload.id, + process: payload.process, }; return Ok(Some(output)); diff --git a/packages/server/src/process/spawn.rs b/packages/server/src/process/spawn.rs index 3ee88d7da..60b1a362a 100644 --- a/packages/server/src/process/spawn.rs +++ b/packages/server/src/process/spawn.rs @@ -193,7 +193,8 @@ impl Server { self.spawn_process_task(&process, permit, clean_guard); } else { let payload = crate::process::queue::Message { - id: output.id.clone(), + process: output.id.clone(), + parent: arg.parent.clone(), }; self.messenger .stream_publish("queue".into(), payload) @@ -1111,19 +1112,6 @@ impl Server { .ok(); } }); - tokio::spawn({ - let server = self.clone(); - async move { - server - .messenger - .publish("watchdog".into(), ()) - .await - .inspect_err(|error| { - tracing::error!(?error, "failed to publish the watchdog message"); - }) - .ok(); - } - }); } fn spawn_process_parent_permit_task( diff --git a/packages/server/src/watchdog.rs b/packages/server/src/watchdog.rs index 92fc71e8f..37111d652 100644 --- a/packages/server/src/watchdog.rs +++ b/packages/server/src/watchdog.rs @@ -1,5 +1,5 @@ use { - crate::Server, + crate::{Server, database}, futures::{ FutureExt as _, StreamExt as _, TryFutureExt, TryStreamExt, future, stream::FuturesUnordered, @@ -9,37 +9,67 @@ use { std::{collections::BTreeMap, fmt::Write, pin::pin}, tangram_client::prelude::*, tangram_database::{self as db, prelude::*}, - tangram_messenger::prelude::*, + tangram_messenger::{BatchConfig, prelude::*}, }; impl Server { - pub async fn watchdog_task(&self, config: &crate::config::Watchdog) -> tg::Result<()> { + pub async fn watchdog_task(&self, config: &crate::config::Watchdog) { + let expired_processes = tokio::spawn({ + let server = self.clone(); + let config = config.clone(); + async move { server.expired_process_task(&config).await } + }); + let cyclic_processes = tokio::spawn({ + let server = self.clone(); + let config = config.clone(); + async move { server.cyclic_processes_task(&config).await } + }); + match future::select(expired_processes, cyclic_processes).await { + future::Either::Left((result, task)) => { + if let Err(error) = result { + tracing::error!(?error, "watchdog task panicked"); + } + task.await + .inspect_err(|error| tracing::error!(?error, "watchdog task panicked")) + .ok(); + }, + future::Either::Right((result, task)) => { + if let Err(error) = result { + tracing::error!(?error, "watchdog task panicked"); + } + task.await + .inspect_err(|error| tracing::error!(?error, "watchdog task panicked")) + .ok(); + }, + } + } + + async fn expired_process_task(&self, config: &crate::config::Watchdog) { loop { // Finish processes. - let expired_future = self - .finish_expired_processes(config) + let result = self + .expired_process_task_inner(config) .inspect_err( |error| tracing::error!(error = %error.trace(), "failed to finish processes"), - ); - let cycle_future = self - .finish_cyclic_processes(config) - .inspect_err( - |error| tracing::error!(error = %error.trace(), "failed to finish processes"), - ); - let result = future::join(expired_future, cycle_future).await; + ) + .await; // If an error occurred or no processes were finished, wait to be signaled or for the timeout to expire. - if matches!(result, (Err(_), Err(_)) | (Ok(0), Ok(0))) { - let stream = self + if matches!(result, Err(_) | Ok(0)) { + let Ok(stream) = self .messenger .subscribe::<()>("watchdog".into(), None) .await - .map_err(|source| { - tg::error!( - !source, - "failed to subscribe to the cancellation message stream" - ) - })?; + .inspect_err(|error| { + tracing::error!( + ?error, + "failed to subscribe to the watchdog message stream" + ); + }) + else { + tokio::time::sleep(config.interval).await; + continue; + }; let mut stream = pin!(stream); tokio::time::timeout(config.interval, stream.next()) .await @@ -48,7 +78,7 @@ impl Server { } } - pub(crate) async fn finish_expired_processes( + async fn expired_process_task_inner( &self, config: &crate::config::Watchdog, ) -> tg::Result { @@ -142,47 +172,100 @@ impl Server { Ok(n) } - async fn finish_cyclic_processes(&self, config: &crate::config::Watchdog) -> tg::Result { - // Get a database connection. - let connection = self + async fn cyclic_processes_task(&self, config: &crate::config::Watchdog) { + // Subscribe to the consumer stream. + let Ok(stream) = self + .messenger + .get_stream("queue".into()) + .await + .inspect_err(|error| tracing::error!(?error, "failed to get the stream")) + else { + return; + }; + let Ok(consumer) = stream + .get_consumer("cycle_detector".into()) + .await + .inspect_err(|error| tracing::error!(?error, "failed to get the consumer")) + else { + return; + }; + let Ok(stream) = consumer + .batch_subscribe::(BatchConfig { + max_messages: Some(config.batch_size.to_u64().unwrap()), + max_bytes: None, + timeout: Some(config.ttl), + }) + .await + .inspect_err(|error| tracing::error!(?error, "failed to subscribe")) + else { + return; + }; + let stream = stream.ready_chunks(config.batch_size); + let mut stream = pin!(stream); + let mut messages = Vec::with_capacity(config.batch_size); + let mut ackers = Vec::with_capacity(config.batch_size); + while let Some(chunk) = stream.next().await { + messages.clear(); + ackers.clear(); + for result in chunk { + let Ok(message) = result.inspect_err(|error| { + tracing::error!(?error, "error reading from process queue stream"); + }) else { + continue; + }; + let (message, acker) = message.split(); + messages.push(message); + ackers.push(acker); + } + self.cyclic_processes_task_inner(&messages) + .await + .inspect_err(|error| tracing::error!(?error, "failed to finish cyclic processes")) + .ok(); + while let Some(acker) = ackers.pop() { + acker.ack().await.ok(); + } + } + } + + async fn cyclic_processes_task_inner( + &self, + messages: &[crate::process::queue::Message], + ) -> tg::Result<()> { + let mut connection = self .database .connection() .await .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; - let p = connection.p(); - - // Get processes to finish. - #[derive(Debug, db::row::Deserialize)] - struct Row { - #[tangram_database(as = "db::value::FromStr")] - process: tg::process::Id, + let transaction = connection + .transaction() + .await + .map_err(|source| tg::error!(!source, "failed "))?; + let results = match &transaction { + #[cfg(feature = "postgres")] + database::Transaction::Postgres(transaction) => { + Self::get_cyclic_processes_postgres(transaction, messages) + .await + .map_err(|source| tg::error!(!source, "failed to get cyclic processes"))? + }, + #[cfg(feature = "sqlite")] + database::Transaction::Sqlite(transaction) => { + Self::get_cyclic_processes_sqlite(transaction, messages) + .await + .map_err(|source| tg::error!(!source, "failed to update parent depths"))? + }, + }; - #[tangram_database(as = "db::value::FromStr")] - child: tg::process::Id, - } - let statement = formatdoc!( - " - select process, child - from process_children - where cycle is null - limit {p}1; - " - ); - let params = db::params![config.batch_size]; - let rows = connection - .query_all_into::(statement.into(), params) + transaction + .commit() .await - .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + .map_err(|source| tg::error!(!source, "failed to commit the transaction"))?; drop(connection); - let n = rows.len().to_u64().unwrap(); - rows.into_iter() - .map(|row| { + + results + .into_iter() + .map(|(id, error)| { let server = self.clone(); async move { - let Some(error) = server.try_detect_cycle(&row.process, &row.child).await? - else { - return Ok(()); - }; let arg = tg::process::finish::Arg { checksum: None, error: Some(tg::Either::Left(error)), @@ -191,134 +274,185 @@ impl Server { output: None, remotes: None, }; - server - .finish_process(&row.child, arg) - .await - .inspect_err(|error| { - tracing::error!(?error, "failed to cancel the process"); - }) - .ok(); - Ok::<_, tg::Error>(()) + server.finish_process(&id, arg).await } }) .collect::>() .try_collect::<()>() .await - .map_err(|source| tg::error!(!source, "cycle detection failed"))?; - Ok(n) + .map_err(|source| tg::error!(!source, "failed to finish processes"))?; + + Ok(()) } - async fn try_detect_cycle( - &self, - parent: &tg::process::Id, - child: &tg::process::Id, - ) -> tg::Result> { - let connection = self - .database - .connection() - .await - .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; - let p = connection.p(); + #[cfg(feature = "postgres")] + async fn get_cyclic_processes_postgres( + transaction: &db::postgres::Transaction<'_>, + messages: &[crate::process::queue::Message], + ) -> tg::Result> { + // Build the parameter arrays dynamically. + let mut parent_placeholders = Vec::new(); + let mut child_placeholders = Vec::new(); + let mut params = Vec::new(); + let mut idx = 1; + for message in messages { + let Some(parent) = &message.parent else { + continue; + }; + parent_placeholders.push(format!("${idx}")); + idx += 1; + child_placeholders.push(format!("${idx}")); + idx += 1; + params.push(db::Value::Text(parent.to_string())); + params.push(db::Value::Text(message.process.to_string())); + } + if parent_placeholders.is_empty() { + return Ok(Vec::new()); + } - // Determine if adding this child process creates a cycle. - let statement = formatdoc!( - " - with recursive ancestors as ( - select {p}1 as id - union all - select process_children.process as id - from ancestors - join process_children on ancestors.id = process_children.child - ) - select exists( - select 1 from ancestors where id = {p}2 - ); - " + // Call the stored function with a single query. + #[derive(db::row::Deserialize)] + struct Row { + process_id: String, + cycle_path: Option, + } + let statement = format!( + "select * from get_cyclic_processes(array[{}]::text[], array[{}]::text[]);", + parent_placeholders.join(", "), + child_placeholders.join(", "), ); - let params = db::params![parent.to_string(), child.to_string()]; - let cycle = connection - .query_one_value_into::(statement.into(), params) + let rows = transaction + .query_all_into::(statement.into(), params) .await - .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; + .map_err(|source| tg::error!(!source, "failed to get cyclic processes"))?; - // Upgrade to a write connection. - drop(connection); - let connection = self - .database - .write_connection() - .await - .map_err(|source| tg::error!(!source, "failed to get a database connection"))?; - let p = connection.p(); + // Build the results with formatted error messages. + let mut results = Vec::new(); + for row in rows { + let id: tg::process::Id = row + .process_id + .parse() + .map_err(|source| tg::error!(!source, "failed to parse the process id"))?; + let mut message = String::from("adding this child process creates a cycle"); + if let Some(cycle) = row.cycle_path { + let processes = cycle.split(' ').collect::>(); + for i in 0..processes.len() - 1 { + let parent = processes[i]; + let child = processes[i + 1]; + if i == 0 { + write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); + } else { + write!(&mut message, "\n{parent} has child {child}").unwrap(); + } + } + } + let error = tg::error::Data { + code: None, + diagnostics: None, + location: None, + message: Some(message), + source: None, + stack: None, + values: BTreeMap::new(), + }; + results.push((id, error)); + } + Ok(results) + } - // Mark the edge as forming a cycle. - let statement = formatdoc!( - " - update process_children set cycle = {p}3 where process = {p}1 and child = {p}2; - " - ); - let params = db::params![parent.to_string(), child.to_string(), cycle]; - connection - .execute(statement.into(), params) - .await - .map_err(|source| tg::error!(!source, "failed to perform the query"))?; + #[cfg(feature = "sqlite")] + async fn get_cyclic_processes_sqlite( + transaction: &db::sqlite::Transaction<'_>, + messages: &[crate::process::queue::Message], + ) -> tg::Result> { + let mut results = Vec::new(); + let p = transaction.p(); - if !cycle { - return Ok(None); - } + for message in messages { + let Some(parent) = &message.parent else { + continue; + }; + let child = &message.process; + let params = db::params![parent.to_string(), child.to_string()]; + let statement = formatdoc!( + " + with recursive ancestors as ( + select {p}1 as id + union all + select process_children.process as id + from ancestors + join process_children on ancestors.id = process_children.child + ) + select exists( + select 1 from ancestors where id = {p}2 + ); + " + ); + let cycle = transaction + .query_one_value_into::(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; - // Construct a good error message by collecting the entire cycle. - let mut message = String::from("adding this child process creates a cycle"); + if !cycle { + continue; + } - // Try to reconstruct the cycle path by walking from the child through its - // descendants until we find a path back to the parent. - let statement = formatdoc!( - " - with recursive reachable (current_process, path) as ( - select {p}2, {p}2 + // Construct a good error message by collecting the entire cycle. + let mut message = String::from("adding this child process creates a cycle"); - union + // Try to reconstruct the cycle path by walking from the child through its + // descendants until we find a path back to the parent. + let statement = formatdoc!( + " + with recursive reachable (current_process, path) as ( + select {p}2, {p}2 - select pc.child, r.path || ' ' || pc.child - from reachable r - join process_children pc on r.current_process = pc.process - where r.path not like '%' || pc.child || '%' - ) - select - {p}1 || ' ' || path as cycle - from reachable - where current_process = {p}1 - limit 1; - " - ); - let params = db::params![parent.to_string(), child.to_string()]; - let cycle = connection - .query_one_value_into::(statement.into(), params) - .await - .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) - .ok(); + union + + select pc.child, r.path || ' ' || pc.child + from reachable r + join process_children pc on r.current_process = pc.process + where r.path not like '%' || pc.child || '%' + ) + select + {p}1 || ' ' || path as cycle + from reachable + where current_process = {p}1 + limit 1; + " + ); + let params = db::params![parent.to_string(), child.to_string()]; + let cycle = transaction + .query_one_value_into::(statement.into(), params) + .await + .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) + .ok(); - // Format the error message. - if let Some(cycle) = cycle { - let processes = cycle.split(' ').collect::>(); - for i in 0..processes.len() - 1 { - let parent = processes[i]; - let child = processes[i + 1]; - if i == 0 { - write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); - } else { - write!(&mut message, "\n{parent} has child {child}").unwrap(); + // Format the error message. + if let Some(cycle) = cycle { + let processes = cycle.split(' ').collect::>(); + for i in 0..processes.len() - 1 { + let parent = processes[i]; + let child = processes[i + 1]; + if i == 0 { + write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); + } else { + write!(&mut message, "\n{parent} has child {child}").unwrap(); + } } } + let error = tg::error::Data { + code: None, + diagnostics: None, + location: None, + message: Some(message), + source: None, + stack: None, + values: BTreeMap::new(), + }; + results.push((child.clone(), error)); } - let error = tg::error::Data { - code: None, - diagnostics: None, - location: None, - message: Some(message), - source: None, - stack: None, - values: BTreeMap::new(), - }; - Ok(Some(error)) + + Ok(results) } } From 3e45e47c5d25f78f5c750177e863ef93fdb88c73 Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Tue, 10 Feb 2026 10:19:05 -0600 Subject: [PATCH 4/5] remove ids from test, rename finish -> finalize in scripts --- packages/cli/test.nu | 2 +- packages/cli/tests/tree/double_build.nu | 12 +++++++++--- scripts/cloud/init.nu | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/cli/test.nu b/packages/cli/test.nu index e49f5223e..505e365cd 100644 --- a/packages/cli/test.nu +++ b/packages/cli/test.nu @@ -716,7 +716,7 @@ export def --env spawn [ "docker:docker@localhost:4500" | save -f $cluster nats stream create $'finalize_($id)' --discard new --retention work --subjects $'($id).finish' --defaults - nats consumer create $'finalize_($id)' finish --deliver all --max-pending 1000000 --pull --defaults + nats consumer create $'finalize_($id)' finalize --deliver all --max-pending 1000000 --pull --defaults nats stream create $'queue_($id)' --discard new --retention work --subjects $'($id).queue' --defaults nats consumer create $'queue_($id)' queue --deliver all --max-pending 1000000 --pull --defaults diff --git a/packages/cli/tests/tree/double_build.nu b/packages/cli/tests/tree/double_build.nu index f931b51c3..f64ab2a8c 100644 --- a/packages/cli/tests/tree/double_build.nu +++ b/packages/cli/tests/tree/double_build.nu @@ -23,10 +23,16 @@ snapshot $output '{"exit":0,"output":42}' let output = tg view $id --mode inline --expand-processes --depth 2 +# Extract only the cycle error message (the last error in the chain). +let output = $output + | lines + | str replace --all --regex 'fil_[a-z0-9]+' '' + | str replace --all --regex 'cmd_[a-z0-9]+' 'cmd_id>' snapshot $output ' - ✓ fil_01xw45e66hhhxemww9m1qmj7jp9n1zjhn3ewggx0f6eb9nwr4js46g#default - ├╴command: cmd_0166tgxrezqabf2zvd3mveyr2e1ss6ws1ehfeay2amxv5tadh47bhg + ✓ #default + ├╴command: cmd_id> ├╴output: 42 ├╴✓ ../b.tg.ts#default - └╴✓ fil_01sa3pyv7baf50x2ymmvy7p41zqnmmv8gp1fq5z3mq60ps8vcfxa30#default + └╴✓ #default + ' diff --git a/scripts/cloud/init.nu b/scripts/cloud/init.nu index 9e575ee2a..622ad1af1 100644 --- a/scripts/cloud/init.nu +++ b/scripts/cloud/init.nu @@ -6,6 +6,7 @@ nats consumer create finalize finalize --deliver all --max-pending 1000000 --pul nats stream create queue --discard new --retention work --subjects queue --defaults nats consumer create queue queue --deliver all --max-pending 1000000 --pull --defaults +nats consumer create queue cycle_detector --deliver all --max-pending 1000000 --pull --defaults cqlsh -e r#'create keyspace store with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 };'# cqlsh -k store -f packages/store/src/scylla.cql From 44faacf0c4bd93ca436c1eb5dfbc2d651edd0982 Mon Sep 17 00:00:00 2001 From: David Yamnitsky Date: Sun, 22 Feb 2026 13:48:32 -0500 Subject: [PATCH 5/5] fixes --- packages/cli/src/main.rs | 1 + packages/server/src/database/postgres.sql | 2 +- packages/server/src/lib.rs | 2 +- packages/server/src/watchdog.rs | 212 +++------------------- packages/server/src/watchdog/postgres.rs | 82 +++++++++ packages/server/src/watchdog/sqlite.rs | 104 +++++++++++ scripts/cloud/init.nu | 2 +- 7 files changed, 213 insertions(+), 192 deletions(-) create mode 100644 packages/server/src/watchdog/postgres.rs create mode 100644 packages/server/src/watchdog/sqlite.rs diff --git a/packages/cli/src/main.rs b/packages/cli/src/main.rs index 7862fb06b..a2c085bf1 100644 --- a/packages/cli/src/main.rs +++ b/packages/cli/src/main.rs @@ -690,6 +690,7 @@ impl Cli { // Start the server. let server = tangram_server::Server::start(config) + .boxed() .await .map_err(|source| tg::error!(!source, "failed to start the server"))?; diff --git a/packages/server/src/database/postgres.sql b/packages/server/src/database/postgres.sql index 6398117d9..205b1e22a 100644 --- a/packages/server/src/database/postgres.sql +++ b/packages/server/src/database/postgres.sql @@ -83,7 +83,7 @@ begin end; $$; -create or replace function get_cyclic_processes( +create or replace function get_process_cycles( parent_ids text[], child_ids text[] ) diff --git a/packages/server/src/lib.rs b/packages/server/src/lib.rs index 05d6cfabf..1af673f5d 100644 --- a/packages/server/src/lib.rs +++ b/packages/server/src/lib.rs @@ -488,7 +488,7 @@ impl Server { .await .map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?; stream - .create_consumer("cycle_detector".to_owned(), consumer_config) + .create_consumer("watchdog".to_owned(), consumer_config) .await .map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?; } diff --git a/packages/server/src/watchdog.rs b/packages/server/src/watchdog.rs index 37111d652..9d699de44 100644 --- a/packages/server/src/watchdog.rs +++ b/packages/server/src/watchdog.rs @@ -6,25 +6,30 @@ use { }, indoc::formatdoc, num::ToPrimitive as _, - std::{collections::BTreeMap, fmt::Write, pin::pin}, + std::{collections::BTreeMap, pin::pin}, tangram_client::prelude::*, tangram_database::{self as db, prelude::*}, tangram_messenger::{BatchConfig, prelude::*}, }; +#[cfg(feature = "postgres")] +mod postgres; +#[cfg(feature = "sqlite")] +mod sqlite; + impl Server { pub async fn watchdog_task(&self, config: &crate::config::Watchdog) { let expired_processes = tokio::spawn({ let server = self.clone(); let config = config.clone(); - async move { server.expired_process_task(&config).await } + async move { server.watchdog_heartbeat_task(&config).await } }); - let cyclic_processes = tokio::spawn({ + let process_cycles = tokio::spawn({ let server = self.clone(); let config = config.clone(); - async move { server.cyclic_processes_task(&config).await } + async move { server.watchdog_cycle_task(&config).await } }); - match future::select(expired_processes, cyclic_processes).await { + match future::select(expired_processes, process_cycles).await { future::Either::Left((result, task)) => { if let Err(error) = result { tracing::error!(?error, "watchdog task panicked"); @@ -44,11 +49,11 @@ impl Server { } } - async fn expired_process_task(&self, config: &crate::config::Watchdog) { + async fn watchdog_heartbeat_task(&self, config: &crate::config::Watchdog) { loop { // Finish processes. let result = self - .expired_process_task_inner(config) + .watchdog_heartbeat_task_inner(config) .inspect_err( |error| tracing::error!(error = %error.trace(), "failed to finish processes"), ) @@ -78,7 +83,7 @@ impl Server { } } - async fn expired_process_task_inner( + async fn watchdog_heartbeat_task_inner( &self, config: &crate::config::Watchdog, ) -> tg::Result { @@ -172,7 +177,7 @@ impl Server { Ok(n) } - async fn cyclic_processes_task(&self, config: &crate::config::Watchdog) { + async fn watchdog_cycle_task(&self, config: &crate::config::Watchdog) { // Subscribe to the consumer stream. let Ok(stream) = self .messenger @@ -183,7 +188,7 @@ impl Server { return; }; let Ok(consumer) = stream - .get_consumer("cycle_detector".into()) + .get_consumer("watchdog".into()) .await .inspect_err(|error| tracing::error!(?error, "failed to get the consumer")) else { @@ -217,9 +222,9 @@ impl Server { messages.push(message); ackers.push(acker); } - self.cyclic_processes_task_inner(&messages) + self.watchdog_cycle_task_inner(&messages) .await - .inspect_err(|error| tracing::error!(?error, "failed to finish cyclic processes")) + .inspect_err(|error| tracing::error!(?error, "failed to finish process cycles")) .ok(); while let Some(acker) = ackers.pop() { acker.ack().await.ok(); @@ -227,7 +232,7 @@ impl Server { } } - async fn cyclic_processes_task_inner( + async fn watchdog_cycle_task_inner( &self, messages: &[crate::process::queue::Message], ) -> tg::Result<()> { @@ -239,19 +244,19 @@ impl Server { let transaction = connection .transaction() .await - .map_err(|source| tg::error!(!source, "failed "))?; + .map_err(|source| tg::error!(!source, "failed to begin a transaction"))?; let results = match &transaction { #[cfg(feature = "postgres")] database::Transaction::Postgres(transaction) => { - Self::get_cyclic_processes_postgres(transaction, messages) + Self::get_process_cycles_postgres(transaction, messages) .await - .map_err(|source| tg::error!(!source, "failed to get cyclic processes"))? + .map_err(|source| tg::error!(!source, "failed to get process cycles"))? }, #[cfg(feature = "sqlite")] database::Transaction::Sqlite(transaction) => { - Self::get_cyclic_processes_sqlite(transaction, messages) + Self::get_process_cycles_sqlite(transaction, messages) .await - .map_err(|source| tg::error!(!source, "failed to update parent depths"))? + .map_err(|source| tg::error!(!source, "failed to get process cycles"))? }, }; @@ -284,175 +289,4 @@ impl Server { Ok(()) } - - #[cfg(feature = "postgres")] - async fn get_cyclic_processes_postgres( - transaction: &db::postgres::Transaction<'_>, - messages: &[crate::process::queue::Message], - ) -> tg::Result> { - // Build the parameter arrays dynamically. - let mut parent_placeholders = Vec::new(); - let mut child_placeholders = Vec::new(); - let mut params = Vec::new(); - let mut idx = 1; - for message in messages { - let Some(parent) = &message.parent else { - continue; - }; - parent_placeholders.push(format!("${idx}")); - idx += 1; - child_placeholders.push(format!("${idx}")); - idx += 1; - params.push(db::Value::Text(parent.to_string())); - params.push(db::Value::Text(message.process.to_string())); - } - if parent_placeholders.is_empty() { - return Ok(Vec::new()); - } - - // Call the stored function with a single query. - #[derive(db::row::Deserialize)] - struct Row { - process_id: String, - cycle_path: Option, - } - let statement = format!( - "select * from get_cyclic_processes(array[{}]::text[], array[{}]::text[]);", - parent_placeholders.join(", "), - child_placeholders.join(", "), - ); - let rows = transaction - .query_all_into::(statement.into(), params) - .await - .map_err(|source| tg::error!(!source, "failed to get cyclic processes"))?; - - // Build the results with formatted error messages. - let mut results = Vec::new(); - for row in rows { - let id: tg::process::Id = row - .process_id - .parse() - .map_err(|source| tg::error!(!source, "failed to parse the process id"))?; - let mut message = String::from("adding this child process creates a cycle"); - if let Some(cycle) = row.cycle_path { - let processes = cycle.split(' ').collect::>(); - for i in 0..processes.len() - 1 { - let parent = processes[i]; - let child = processes[i + 1]; - if i == 0 { - write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); - } else { - write!(&mut message, "\n{parent} has child {child}").unwrap(); - } - } - } - let error = tg::error::Data { - code: None, - diagnostics: None, - location: None, - message: Some(message), - source: None, - stack: None, - values: BTreeMap::new(), - }; - results.push((id, error)); - } - Ok(results) - } - - #[cfg(feature = "sqlite")] - async fn get_cyclic_processes_sqlite( - transaction: &db::sqlite::Transaction<'_>, - messages: &[crate::process::queue::Message], - ) -> tg::Result> { - let mut results = Vec::new(); - let p = transaction.p(); - - for message in messages { - let Some(parent) = &message.parent else { - continue; - }; - let child = &message.process; - let params = db::params![parent.to_string(), child.to_string()]; - let statement = formatdoc!( - " - with recursive ancestors as ( - select {p}1 as id - union all - select process_children.process as id - from ancestors - join process_children on ancestors.id = process_children.child - ) - select exists( - select 1 from ancestors where id = {p}2 - ); - " - ); - let cycle = transaction - .query_one_value_into::(statement.into(), params) - .await - .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; - - if !cycle { - continue; - } - - // Construct a good error message by collecting the entire cycle. - let mut message = String::from("adding this child process creates a cycle"); - - // Try to reconstruct the cycle path by walking from the child through its - // descendants until we find a path back to the parent. - let statement = formatdoc!( - " - with recursive reachable (current_process, path) as ( - select {p}2, {p}2 - - union - - select pc.child, r.path || ' ' || pc.child - from reachable r - join process_children pc on r.current_process = pc.process - where r.path not like '%' || pc.child || '%' - ) - select - {p}1 || ' ' || path as cycle - from reachable - where current_process = {p}1 - limit 1; - " - ); - let params = db::params![parent.to_string(), child.to_string()]; - let cycle = transaction - .query_one_value_into::(statement.into(), params) - .await - .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) - .ok(); - - // Format the error message. - if let Some(cycle) = cycle { - let processes = cycle.split(' ').collect::>(); - for i in 0..processes.len() - 1 { - let parent = processes[i]; - let child = processes[i + 1]; - if i == 0 { - write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); - } else { - write!(&mut message, "\n{parent} has child {child}").unwrap(); - } - } - } - let error = tg::error::Data { - code: None, - diagnostics: None, - location: None, - message: Some(message), - source: None, - stack: None, - values: BTreeMap::new(), - }; - results.push((child.clone(), error)); - } - - Ok(results) - } } diff --git a/packages/server/src/watchdog/postgres.rs b/packages/server/src/watchdog/postgres.rs new file mode 100644 index 000000000..914af4c30 --- /dev/null +++ b/packages/server/src/watchdog/postgres.rs @@ -0,0 +1,82 @@ +use { + crate::Server, + std::{collections::BTreeMap, fmt::Write}, + tangram_client::prelude::*, + tangram_database::{self as db, prelude::*}, +}; + +impl Server { + pub(super) async fn get_process_cycles_postgres( + transaction: &db::postgres::Transaction<'_>, + messages: &[crate::process::queue::Message], + ) -> tg::Result> { + // Build the parameter arrays dynamically. + let mut parent_placeholders = Vec::new(); + let mut child_placeholders = Vec::new(); + let mut params = Vec::new(); + let mut index = 1; + for message in messages { + let Some(parent) = &message.parent else { + continue; + }; + parent_placeholders.push(format!("${index}")); + index += 1; + child_placeholders.push(format!("${index}")); + index += 1; + params.push(db::Value::Text(parent.to_string())); + params.push(db::Value::Text(message.process.to_string())); + } + if parent_placeholders.is_empty() { + return Ok(Vec::new()); + } + + // Call the stored function with a single query. + #[derive(db::row::Deserialize)] + struct Row { + process_id: String, + cycle_path: Option, + } + let statement = format!( + "select * from get_process_cycles(array[{}]::text[], array[{}]::text[]);", + parent_placeholders.join(", "), + child_placeholders.join(", "), + ); + let rows = transaction + .query_all_into::(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to get process cycles"))?; + + // Build the results with formatted error messages. + let mut results = Vec::new(); + for row in rows { + let id: tg::process::Id = row + .process_id + .parse() + .map_err(|source| tg::error!(!source, "failed to parse the process id"))?; + let mut message = String::from("adding this child process creates a cycle"); + if let Some(cycle) = row.cycle_path { + let processes = cycle.split(' ').collect::>(); + for i in 0..processes.len() - 1 { + let parent = processes[i]; + let child = processes[i + 1]; + if i == 0 { + write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); + } else { + write!(&mut message, "\n{parent} has child {child}").unwrap(); + } + } + } + let error = tg::error::Data { + code: None, + diagnostics: None, + location: None, + message: Some(message), + source: None, + stack: None, + values: BTreeMap::new(), + }; + results.push((id, error)); + } + Ok(results) + } +} diff --git a/packages/server/src/watchdog/sqlite.rs b/packages/server/src/watchdog/sqlite.rs new file mode 100644 index 000000000..dc54d4769 --- /dev/null +++ b/packages/server/src/watchdog/sqlite.rs @@ -0,0 +1,104 @@ +use { + crate::Server, + indoc::formatdoc, + std::{collections::BTreeMap, fmt::Write}, + tangram_client::prelude::*, + tangram_database::{self as db, prelude::*}, +}; + +impl Server { + pub(super) async fn get_process_cycles_sqlite( + transaction: &db::sqlite::Transaction<'_>, + messages: &[crate::process::queue::Message], + ) -> tg::Result> { + let mut results = Vec::new(); + let p = transaction.p(); + + for message in messages { + let Some(parent) = &message.parent else { + continue; + }; + let child = &message.process; + let params = db::params![parent.to_string(), child.to_string()]; + let statement = formatdoc!( + " + with recursive ancestors as ( + select {p}1 as id + union + select process_children.process as id + from ancestors + join process_children on ancestors.id = process_children.child + ) + select exists( + select 1 from ancestors where id = {p}2 + ); + " + ); + let cycle = transaction + .query_one_value_into::(statement.into(), params) + .await + .map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?; + + if !cycle { + continue; + } + + // Construct a good error message by collecting the entire cycle. + let mut message = String::from("adding this child process creates a cycle"); + + // Try to reconstruct the cycle path by walking from the child through its + // descendants until we find a path back to the parent. + let statement = formatdoc!( + " + with recursive reachable (current_process, path) as ( + select {p}2, {p}2 + + union + + select pc.child, r.path || ' ' || pc.child + from reachable r + join process_children pc on r.current_process = pc.process + where r.path not like '%' || pc.child || '%' + ) + select + {p}1 || ' ' || path as cycle + from reachable + where current_process = {p}1 + limit 1; + " + ); + let params = db::params![parent.to_string(), child.to_string()]; + let cycle = transaction + .query_one_value_into::(statement.into(), params) + .await + .inspect_err(|error| tracing::error!(?error, "failed to get the cycle")) + .ok(); + + // Format the error message. + if let Some(cycle) = cycle { + let processes = cycle.split(' ').collect::>(); + for i in 0..processes.len() - 1 { + let parent = processes[i]; + let child = processes[i + 1]; + if i == 0 { + write!(&mut message, "\n{parent} tried to add child {child}").unwrap(); + } else { + write!(&mut message, "\n{parent} has child {child}").unwrap(); + } + } + } + let error = tg::error::Data { + code: None, + diagnostics: None, + location: None, + message: Some(message), + source: None, + stack: None, + values: BTreeMap::new(), + }; + results.push((child.clone(), error)); + } + + Ok(results) + } +} diff --git a/scripts/cloud/init.nu b/scripts/cloud/init.nu index 622ad1af1..eda1ee517 100644 --- a/scripts/cloud/init.nu +++ b/scripts/cloud/init.nu @@ -6,7 +6,7 @@ nats consumer create finalize finalize --deliver all --max-pending 1000000 --pul nats stream create queue --discard new --retention work --subjects queue --defaults nats consumer create queue queue --deliver all --max-pending 1000000 --pull --defaults -nats consumer create queue cycle_detector --deliver all --max-pending 1000000 --pull --defaults +nats consumer create queue watchdog --deliver all --max-pending 1000000 --pull --defaults cqlsh -e r#'create keyspace store with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 };'# cqlsh -k store -f packages/store/src/scylla.cql