Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ impl Cli {

// Start the server.
let server = tangram_server::Server::start(config)
.boxed()
.await
.map_err(|source| tg::error!(!source, "failed to start the server"))?;

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test.nu
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ export def --env spawn [
"docker:docker@localhost:4500" | save -f $cluster

nats stream create $'finalize_($id)' --discard new --retention work --subjects $'($id).finish' --defaults
nats consumer create $'finalize_($id)' finish --deliver all --max-pending 1000000 --pull --defaults
nats consumer create $'finalize_($id)' finalize --deliver all --max-pending 1000000 --pull --defaults
nats stream create $'queue_($id)' --discard new --retention work --subjects $'($id).queue' --defaults
nats consumer create $'queue_($id)' queue --deliver all --max-pending 1000000 --pull --defaults

Expand Down
12 changes: 9 additions & 3 deletions packages/cli/tests/tree/double_build.nu
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ snapshot $output '{"exit":0,"output":42}'

let output = tg view $id --mode inline --expand-processes --depth 2

# Extract only the cycle error message (the last error in the chain).
let output = $output
| lines
| str replace --all --regex 'fil_[a-z0-9]+' '<fil_id>'
| str replace --all --regex 'cmd_[a-z0-9]+' 'cmd_id>'
snapshot $output '
fil_01xw45e66hhhxemww9m1qmj7jp9n1zjhn3ewggx0f6eb9nwr4js46g#default
├╴command: cmd_0166tgxrezqabf2zvd3mveyr2e1ss6ws1ehfeay2amxv5tadh47bhg
<fil_id>#default
├╴command: cmd_id>
├╴output: 42
├╴✓ ../b.tg.ts#default
└╴✓ fil_01sa3pyv7baf50x2ymmvy7p41zqnmmv8gp1fq5z3mq60ps8vcfxa30#default
└╴✓ <fil_id>#default

'
23 changes: 19 additions & 4 deletions packages/messenger/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ struct StreamState {
closed: bool,
config: StreamConfig,
consumers: HashMap<String, (Consumer, ConsumerState)>,
messages: BTreeMap<u64, Arc<dyn Payload>>,
messages: BTreeMap<u64, MessageState>,
sequence: u64,
}

struct MessageState {
payload: Arc<dyn Payload>,
acks_remaining: usize,
}

#[derive(Debug)]
struct ConsumerState {
#[expect(dead_code)]
Expand Down Expand Up @@ -324,7 +329,12 @@ impl Stream {
state.sequence += 1;
let sequence = state.sequence;
let payload: Arc<dyn Payload> = Arc::new(payload);
state.messages.insert(sequence, payload);
let acks_remaining = state.consumers.len();
let message_state = MessageState {
payload,
acks_remaining,
};
state.messages.insert(sequence, message_state);
sequences.push(sequence);
}

Expand Down Expand Up @@ -452,7 +462,7 @@ impl Consumer {
.iter()
.skip_while(|(sequence, _)| **sequence <= consumer_sequence)
.take(max_messages)
.map(|(sequence, payload)| (*sequence, payload.clone()))
.map(|(sequence, state)| (*sequence, Arc::clone(&state.payload)))
.collect::<Vec<_>>();

// Set the consumer's sequence number.
Expand Down Expand Up @@ -506,7 +516,12 @@ impl Consumer {
.upgrade()
.ok_or_else(|| Error::other("the stream was destroyed"))?;
let mut state = stream.state.write().unwrap();
state.messages.remove(&sequence);
if let Some(message_state) = state.messages.get_mut(&sequence) {
message_state.acks_remaining = message_state.acks_remaining.saturating_sub(1);
if message_state.acks_remaining == 0 {
state.messages.remove(&sequence);
}
}
Ok(())
}
}
Expand Down
68 changes: 64 additions & 4 deletions packages/server/src/database/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ as $$
declare
current_ids text[];
updated_ids text[];
visited_ids text[] := '{}';
begin
current_ids := changed_process_ids;

while array_length(current_ids, 1) is not null and array_length(current_ids, 1) > 0 loop
-- Update parents based on their children's depths
-- Update parents based on their children's depths, excluding already visited parents to handle cycles.
with child_depths as (
select process_children.process, max(processes.depth) as max_child_depth
from process_children
join processes on processes.id = process_children.child
where process_children.child = any(current_ids)
and not process_children.process = any(visited_ids)
group by process_children.process
),
updated as (
Expand All @@ -69,15 +71,73 @@ begin
select coalesce(array_agg(id), '{}') into updated_ids
from updated;

-- Exit if no parents were updated
-- Mark the current batch as visited.
visited_ids := visited_ids || current_ids;

-- Exit if no parents were updated.
exit when cardinality(updated_ids) = 0;

-- Continue with the updated parents
-- Continue with the updated parents.
current_ids := updated_ids;
end loop;
end;
$$;

create or replace function get_process_cycles(
parent_ids text[],
child_ids text[]
)
returns table(process_id text, cycle_path text)
language plpgsql
as $$
declare
i int;
p_id text;
c_id text;
is_cycle boolean;
found_path text;
begin
for i in 1..coalesce(array_length(parent_ids, 1), 0) loop
p_id := parent_ids[i];
c_id := child_ids[i];

-- Check if adding c_id as a child of p_id creates a cycle by walking ancestors of p_id.
select exists(
with recursive ancestors as (
select p_id as ancestor_id
union
select pc.process
from ancestors a
join process_children pc on a.ancestor_id = pc.child
)
select 1 from ancestors where ancestor_id = c_id
) into is_cycle;

if not is_cycle then
continue;
end if;

-- Reconstruct the cycle path by walking descendants of c_id to find p_id.
select p_id || ' ' || r.path into found_path
from (
with recursive reachable(current_process, path) as (
select c_id, c_id::text
union
select pc.child, r.path || ' ' || pc.child
from reachable r
join process_children pc on r.current_process = pc.process
where r.path not like '%' || pc.child || '%'
)
select current_process, path from reachable where current_process = p_id limit 1
) r;

process_id := c_id;
cycle_path := found_path;
return next;
end loop;
end;
$$;

create table process_tokens (
process text not null,
token text not null
Expand All @@ -90,8 +150,8 @@ create index process_tokens_token_index on process_tokens (token);
create table process_children (
process text not null,
child text not null,
position int8 not null,
options text,
position int8 not null,
token text
);

Expand Down
14 changes: 6 additions & 8 deletions packages/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,11 @@ impl Server {
deliver: tangram_messenger::DeliverPolicy::All,
};
stream
.create_consumer("queue".to_owned(), consumer_config)
.create_consumer("queue".to_owned(), consumer_config.clone())
.await
.map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?;
stream
.create_consumer("watchdog".to_owned(), consumer_config)
.await
.map_err(|source| tg::error!(!source, "failed to create the queue consumer"))?;
}
Expand Down Expand Up @@ -797,13 +801,7 @@ impl Server {
let server = server.clone();
let config = config.clone();
async move {
server
.watchdog_task(&config)
.await
.inspect_err(
|error| tracing::error!(error = %error.trace(), "the watchdog task failed"),
)
.ok();
server.watchdog_task(&config).await;
}
})
});
Expand Down
11 changes: 7 additions & 4 deletions packages/server/src/process/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use {
)]
pub struct Message {
#[tangram_serialize(id = 0)]
pub id: tg::process::Id,
pub process: tg::process::Id,

#[tangram_serialize(id = 1)]
pub parent: Option<tg::process::Id>,
}

impl Server {
Expand Down Expand Up @@ -78,7 +81,7 @@ impl Server {
"
);
let now = time::OffsetDateTime::now_utc().unix_timestamp();
let params = db::params![payload.id.to_string(), now];
let params = db::params![payload.process.to_string(), now];
let result = connection
.execute(statement.into(), params)
.await
Expand All @@ -96,12 +99,12 @@ impl Server {
}

// Publish a message that the status changed.
let subject = format!("processes.{}.status", payload.id);
let subject = format!("processes.{}.status", payload.process);
self.messenger.publish(subject, ()).await.ok();

// Return the dequeued process.
let output = tg::process::queue::Output {
process: payload.id,
process: payload.process,
};

return Ok(Some(output));
Expand Down
79 changes: 11 additions & 68 deletions packages/server/src/process/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
stream::{self, BoxStream, FuturesUnordered},
},
indoc::{formatdoc, indoc},
std::{fmt::Write, pin::pin},
std::pin::pin,
tangram_client::prelude::*,
tangram_database::{self as db, prelude::*},
tangram_futures::{stream::Ext as _, task::Task},
Expand Down Expand Up @@ -193,7 +193,8 @@ impl Server {
self.spawn_process_task(&process, permit, clean_guard);
} else {
let payload = crate::process::queue::Message {
id: output.id.clone(),
process: output.id.clone(),
parent: arg.parent.clone(),
};
self.messenger
.stream_publish("queue".into(), payload)
Expand Down Expand Up @@ -973,72 +974,6 @@ impl Server {
) -> tg::Result<()> {
let p = transaction.p();

// Determine if adding this child process creates a cycle.
let statement = formatdoc!(
"
with recursive ancestors as (
select {p}1 as id
union all
select process_children.process as id
from ancestors
join process_children on ancestors.id = process_children.child
)
select exists(
select 1 from ancestors where id = {p}2
);
"
);
let params = db::params![parent.to_string(), child.to_string()];
let cycle = transaction
.query_one_value_into::<bool>(statement.into(), params)
.await
.map_err(|source| tg::error!(!source, "failed to execute the cycle check"))?;

// If adding this child creates a cycle, return an error.
if cycle {
// Try to reconstruct the cycle path by walking from the child through its
// descendants until we find a path back to the parent.
let statement = formatdoc!(
"
with recursive reachable (current_process, path) as (
select {p}2, {p}2

union

select pc.child, r.path || ' ' || pc.child
from reachable r
join process_children pc on r.current_process = pc.process
where r.path not like '%' || pc.child || '%'
)
select
{p}1 || ' ' || path as cycle
from reachable
where current_process = {p}1
limit 1;
"
);
let params = db::params![parent.to_string(), child.to_string()];
let cycle = transaction
.query_one_value_into::<String>(statement.into(), params)
.await
.inspect_err(|error| tracing::error!(?error, "failed to get the cycle"))
.ok();
let mut message = String::from("adding this child process creates a cycle");
if let Some(cycle) = cycle {
let processes = cycle.split(' ').collect::<Vec<_>>();
for i in 0..processes.len() - 1 {
let parent = processes[i];
let child = processes[i + 1];
if i == 0 {
write!(&mut message, "\n{parent} tried to add child {child}").unwrap();
} else {
write!(&mut message, "\n{parent} has child {child}").unwrap();
}
}
}
return Err(tg::error!("{message}"));
}

// Add the child to the database.
let statement = formatdoc!(
"
Expand Down Expand Up @@ -1090,6 +1025,7 @@ impl Server {
child_ids: Vec<String>,
) -> tg::Result<()> {
let mut current_ids = child_ids;
let mut visited = std::collections::HashSet::new();

while !current_ids.is_empty() {
let mut updated_ids = Vec::new();
Expand Down Expand Up @@ -1119,6 +1055,10 @@ impl Server {

// Update each parent's depth if needed.
for parent in parents {
// Skip parents that have already been visited to avoid infinite loops from cycles.
if visited.contains(&parent.process) {
continue;
}
if let Some(max_child_depth) = parent.max_child_depth {
let statement = indoc!(
"
Expand All @@ -1144,6 +1084,9 @@ impl Server {
}
}

// Mark the current batch as visited.
visited.extend(current_ids);

// Exit if no parents were updated.
if updated_ids.is_empty() {
break;
Expand Down
Loading