diff --git a/agentwire/macros/src/broker.rs b/agentwire/macros/src/broker.rs index 4a605a60..53dfb9ca 100644 --- a/agentwire/macros/src/broker.rs +++ b/agentwire/macros/src/broker.rs @@ -187,8 +187,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { let handler = format_ident!("handle_{}", ident); quote! { if let Some(port) = fut.broker.#ident.enabled() { + any_handler_enabled |= true; loop { match ::futures::StreamExt::poll_next_unpin(port, cx) { + // check if message is newer than fence ::std::task::Poll::Ready(Some(output)) if output.source_ts > fence => { match fut.broker.#handler(fut.plan, output) { ::std::result::Result::Ok(::agentwire::BrokerFlow::Break) => { @@ -210,9 +212,10 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { } } ::std::task::Poll::Ready(::std::option::Option::Some(_)) => { - continue; + continue; // skip message because its older than `fence` } ::std::task::Poll::Ready(::std::option::Option::None) => { + // channel sender is dropped, which means agent terminated return ::std::task::Poll::Ready( ::std::result::Result::Err( ::agentwire::BrokerError::AgentTerminated( @@ -222,7 +225,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { ); } ::std::task::Poll::Pending => { - break; + break; // No more messages to process } } } @@ -249,7 +252,7 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { }, ); let run = quote! { - #[allow(missing_docs)] + /// Future for [`#ident::run`]. pub struct #run_fut_name<'a> { broker: &'a mut #ident, plan: &'a mut dyn #broker_plan, @@ -265,20 +268,32 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { ) -> ::std::task::Poll { let fence = self.fence; let fut = self.as_mut().get_mut(); + let mut any_handler_enabled = false; 'outer: loop { #(#run_handlers)* #poll_extra + #[allow(unreachable_code)] + if !any_handler_enabled { + // Prevent infinite loop in edge case where no handlers are + // enabled. + return ::std::task::Poll::Pending; + } } + } } impl #ident { - #[allow(missing_docs)] + /// Equivalent to [`Self::run_with_fence()`] with a fence of `Instant::now()`. pub fn run<'a>(&'a mut self, plan: &'a mut dyn #broker_plan) -> #run_fut_name<'a> { Self::run_with_fence(self, plan, ::std::time::Instant::now()) } - #[allow(missing_docs)] + /// Runs the broker, filtering any events to only those with a timestamp + /// newer than `fence`. + /// + /// Events are fed the broker's `handle_*` functions, and `plan` is passed + /// there as an argument. pub fn run_with_fence<'a>( &'a mut self, plan: &'a mut dyn #broker_plan, diff --git a/agentwire/tests/broker.rs b/agentwire/tests/broker.rs new file mode 100644 index 00000000..bd334828 --- /dev/null +++ b/agentwire/tests/broker.rs @@ -0,0 +1,103 @@ +use agentwire::{ + agent::{Cell, Task}, + port::{self, Port}, + Agent, Broker, BrokerFlow, +}; +use futures::FutureExt; + +#[derive(Debug, thiserror::Error)] +#[error("dummy error")] +pub struct Error; + +struct DummyAgent; + +impl Port for DummyAgent { + type Input = (); + type Output = (); + + const INPUT_CAPACITY: usize = 0; + const OUTPUT_CAPACITY: usize = 0; +} + +impl Agent for DummyAgent { + const NAME: &'static str = "dummy"; +} + +impl Task for DummyAgent { + type Error = Error; + + async fn run(self, _port: agentwire::port::Inner) -> Result<(), Self::Error> { + std::future::pending().await + } +} + +#[derive(Broker)] +#[broker(plan = PlanT, error = Error)] +struct NoAgents {} + +impl NoAgents { + fn new() -> Self { + new_no_agents!() + } +} + +#[derive(Broker)] +#[broker(plan = PlanT, error = Error)] +struct OneAgent { + #[agent(task, init)] + dummy: Cell, +} + +impl OneAgent { + fn new() -> Self { + new_one_agent!() + } + + #[expect(dead_code, reason = "agent never enabled")] + fn init_dummy(&mut self) -> DummyAgent { + DummyAgent + } + + fn handle_dummy( + &mut self, + _plan: &mut dyn PlanT, + _output: port::Output, + ) -> Result { + unreachable!("agent never enabled") + } +} + +trait PlanT {} + +#[derive(Debug)] +struct Plan; + +impl PlanT for Plan {} + +#[test] +fn test_broker_with_no_agents_never_blocks() { + let waker = futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + let mut plan = Plan; + + let mut no_agents = NoAgents::new(); + let mut run_fut = no_agents.run(&mut plan); + // poll should always immediately return, instead of looping forever. + assert!(run_fut.poll_unpin(&mut cx).is_pending()); + assert!(run_fut.poll_unpin(&mut cx).is_pending()); + assert!(run_fut.poll_unpin(&mut cx).is_pending()); +} + +#[test] +fn test_broker_with_one_agent_never_blocks() { + let waker = futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + let mut plan = Plan; + + let mut one_agent = OneAgent::new(); + let mut run_fut = one_agent.run(&mut plan); + // poll should always immediately return, instead of looping forever. + assert!(run_fut.poll_unpin(&mut cx).is_pending()); + assert!(run_fut.poll_unpin(&mut cx).is_pending()); + assert!(run_fut.poll_unpin(&mut cx).is_pending()); +}