From e5f7248462d00f982c5ae1edc94277e1930020b7 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Wed, 6 Aug 2025 08:07:54 -0700 Subject: [PATCH 1/2] "inverted control" mode through Proc::instance Summary: Adds Proc::instance() which returns an actor instance and its corresponding handler. This allows the user to create a regular actor without any message handlers. The returned `Instance` provides all the normal capabilities, including sending and receiving messages, being able to spawn and manage child actors, etc. This is the foundation for a kind of "script mode" actor. Differential Revision: D79685752 --- hyperactor/src/actor.rs | 23 +++- hyperactor/src/proc.rs | 216 ++++++++++++++++++++++++++-------- hyperactor/src/supervision.rs | 13 ++ 3 files changed, 202 insertions(+), 50 deletions(-) diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 0b046fcef..34398bf93 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -135,6 +135,17 @@ pub trait Actor: Sized + Send + Debug + 'static { } } +/// An actor that does nothing. It is used to represent "client only" actors, +/// returned by [`Proc::instance`]. +#[async_trait] +impl Actor for () { + type Params = (); + + async fn new(params: Self::Params) -> Result { + Ok(params) + } +} + /// A Handler allows an actor to handle a specific message type. #[async_trait] pub trait Handler: Actor { @@ -356,6 +367,16 @@ pub enum Signal { ChildStopped(Index), } +impl fmt::Display for Signal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Signal::DrainAndStop => write!(f, "DrainAndStop"), + Signal::Stop => write!(f, "Stop"), + Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index), + } + } +} + /// The runtime status of an actor. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)] pub enum ActorStatus { @@ -390,7 +411,7 @@ pub enum ActorStatus { impl ActorStatus { /// Tells whether the status is a terminal state. - fn is_terminal(&self) -> bool { + pub(crate) fn is_terminal(&self) -> bool { matches!(self, Self::Stopped | Self::Failed(_)) } diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 2e58a1f00..bfa72acee 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -460,7 +460,7 @@ impl Proc { actor_type = std::any::type_name::(), actor_id = actor_id.to_string(), ); - let instance = Instance::new(self.clone(), actor_id.clone(), None); + let instance = Instance::new(self.clone(), actor_id.clone(), false, None); let actor = A::new(params).await?; // Add this actor to the proc's actor ledger. We do not actively remove // inactive actors from ledger, because the actor's state can be inferred @@ -472,6 +472,28 @@ impl Proc { instance.start(actor).await } + /// Create and return an actor instance and its corresponding handle. This allows actors to be + /// "inverted": the caller can use the returned [`Instance`] to send and receive messages, + /// launch child actors, etc. The actor itself does not handle any messages, and supervision events + /// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be + /// referenced and stopped. + pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> { + let actor_id = self.allocate_root_id(name)?; + let _ = tracing::debug_span!( + "actor_instance", + actor_name = name, + actor_type = std::any::type_name::<()>(), + actor_id = actor_id.to_string(), + ); + + let instance = Instance::new(self.clone(), actor_id.clone(), true, None); + let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone()); + + instance.change_status(ActorStatus::Client); + + Ok((instance, handle)) + } + /// Spawn a child actor from the provided parent on this proc. The parent actor /// must already belong to this proc, a fact which is asserted in code. /// @@ -483,7 +505,7 @@ impl Proc { params: A::Params, ) -> Result, anyhow::Error> { let actor_id = self.allocate_child_id(parent.actor_id())?; - let instance = Instance::new(self.clone(), actor_id, Some(parent.clone())); + let instance = Instance::new(self.clone(), actor_id, false, Some(parent.clone())); let actor = A::new(params).await?; instance.start(actor).await } @@ -773,13 +795,8 @@ pub struct Instance { /// The mailbox associated with the actor. mailbox: Mailbox, - /// The actor's signal receiver. This is used to - /// receive signals sent to the actor. - signal_receiver: PortReceiver, - - /// The actor's supervision event receiver. This is used to receive supervision - /// event from the parent. - supervision_event_receiver: PortReceiver, + /// Receivers for the actor loop, if available. + actor_loop_receivers: Option<(PortReceiver, PortReceiver)>, ports: Arc>, @@ -797,19 +814,17 @@ pub struct Instance { impl Instance { /// Create a new actor instance in Created state. - pub(crate) fn new(proc: Proc, actor_id: ActorId, parent: Option) -> Self { + pub(crate) fn new( + proc: Proc, + actor_id: ActorId, + detached: bool, + parent: Option, + ) -> Self { // Set up messaging let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade())); let (work_tx, work_rx) = mpsc::unbounded_channel(); - let ports: Arc> = Arc::new(Ports::new(mailbox.clone(), work_tx)); - let (signal_port, signal_receiver) = ports.open_message_port().unwrap(); - proc.state().proc_muxer.bind_mailbox(mailbox.clone()); - - // Supervision event port is used locally in the proc - let (supervision_port, supervision_event_receiver) = mailbox.open_port(); - let (status_tx, status_rx) = watch::channel(ActorStatus::Created); let actor_type = match TypeInfo::of::() { @@ -818,12 +833,24 @@ impl Instance { }; let ais = actor_id.to_string(); + let actor_loop_ports = if detached { + None + } else { + let (signal_port, signal_receiver) = ports.open_message_port().unwrap(); + let (supervision_port, supervision_receiver) = mailbox.open_port(); + Some(( + (signal_port, supervision_port), + (signal_receiver, supervision_receiver), + )) + }; + + let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip(); + let cell = InstanceCell::new( actor_id, actor_type, proc.clone(), - signal_port, - supervision_port, + actor_loop, status_rx, parent, ports.clone(), @@ -834,8 +861,7 @@ impl Instance { proc, cell, mailbox, - signal_receiver, - supervision_event_receiver, + actor_loop_receivers, ports, work_rx, status_tx, @@ -934,7 +960,9 @@ impl Instance { } async fn serve(mut self, mut actor: A) { - let result = self.run_actor_tree(&mut actor).await; + let actor_loop_receivers = self.actor_loop_receivers.take().unwrap(); + + let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await; let actor_status = match result { Ok(_) => ActorStatus::Stopped, @@ -978,13 +1006,20 @@ impl Instance { /// Runs the actor, and manages its supervision tree. When the function returns, /// the whole tree rooted at this actor has stopped. - async fn run_actor_tree(&mut self, actor: &mut A) -> Result<(), ActorError> { + async fn run_actor_tree( + &mut self, + actor: &mut A, + mut actor_loop_receivers: (PortReceiver, PortReceiver), + ) -> Result<(), ActorError> { // It is okay to catch all panics here, because we are in a tokio task, // and tokio will catch the panic anyway: // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic // What we do here is just to catch it early so we can handle it. - let result = match AssertUnwindSafe(self.run(actor)).catch_unwind().await { + let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers)) + .catch_unwind() + .await + { Ok(result) => result, Err(err) => { // This is only the error message. Backtrace is not included. @@ -1027,8 +1062,9 @@ impl Instance { self.cell.unlink(&child); } + let (mut signal_receiver, _) = actor_loop_receivers; while self.cell.child_count() > 0 { - match self.signal_receiver.recv().await? { + match signal_receiver.recv().await? { Signal::ChildStopped(pid) => { assert!(self.cell.get_child(pid).is_none()); } @@ -1040,9 +1076,15 @@ impl Instance { } /// Initialize and run the actor until it fails or is stopped. - async fn run(&mut self, actor: &mut A) -> Result<(), ActorError> { + async fn run( + &mut self, + actor: &mut A, + actor_loop_receivers: &mut (PortReceiver, PortReceiver), + ) -> Result<(), ActorError> { tracing::debug!("entering actor loop: {}", self.self_id()); + let (signal_receiver, supervision_event_receiver) = actor_loop_receivers; + self.change_status(ActorStatus::Initializing); actor .init(self) @@ -1060,13 +1102,13 @@ impl Instance { let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs); let work = work.expect("inconsistent work queue state"); if let Err(err) = work.handle(actor, self).await { - for supervision_event in self.supervision_event_receiver.drain() { + for supervision_event in supervision_event_receiver.drain() { self.handle_supervision_event(actor, supervision_event).await; } return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err))); } } - signal = self.signal_receiver.recv() => { + signal = signal_receiver.recv() => { let signal = signal.map_err(ActorError::from); tracing::debug!("Received signal {signal:?}"); match signal? { @@ -1079,7 +1121,7 @@ impl Instance { }, } } - Ok(supervision_event) = self.supervision_event_receiver.recv() => { + Ok(supervision_event) = supervision_event_receiver.recv() => { self.handle_supervision_event(actor, supervision_event).await; } } @@ -1204,6 +1246,19 @@ impl Instance { } } +impl Drop for Instance { + fn drop(&mut self) { + self.status_tx.send_if_modified(|status| { + if status.is_terminal() { + false + } else { + *status = ActorStatus::Stopped; + true + } + }); + } +} + impl cap::sealed::CanSend for Instance { fn post(&self, dest: PortId, headers: Attrs, data: Serialized) { let envelope = MessageEnvelope::new(self.self_id().clone(), dest, data, headers); @@ -1339,13 +1394,8 @@ struct InstanceState { /// The proc in which the actor is running. proc: Proc, - /// The actor's signal port. This is used to send - /// signals to the actor. - signal: PortHandle, - - /// The actor's supervision port. This is used to send - /// supervision event to the actor (usually by its children). - supervision_port: PortHandle, + /// Control port handles to the actor loop, if one is running. + actor_loop: Option<(PortHandle, PortHandle)>, /// An observer that stores the current status of the actor. status: watch::Receiver, @@ -1397,8 +1447,7 @@ impl InstanceCell { actor_id: ActorId, actor_type: ActorType, proc: Proc, - signal: PortHandle, - supervision_port: PortHandle, + actor_loop: Option<(PortHandle, PortHandle)>, status: watch::Receiver, parent: Option, ports: Arc, @@ -1409,8 +1458,7 @@ impl InstanceCell { actor_id: actor_id.clone(), actor_type, proc: proc.clone(), - signal, - supervision_port, + actor_loop, status, parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()), children: DashMap::new(), @@ -1455,7 +1503,16 @@ impl InstanceCell { /// Send a signal to the actor. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`. pub fn signal(&self, signal: Signal) -> Result<(), ActorError> { - self.inner.signal.send(signal).map_err(ActorError::from) + if let Some((signal_port, _)) = &self.inner.actor_loop { + signal_port.send(signal).map_err(ActorError::from) + } else { + tracing::warn!( + "{}: attempted to send signal {} to detached actor", + self.inner.actor_id, + signal + ); + Ok(()) + } } /// Used by this actor's children to send a supervision event to this actor. @@ -1467,14 +1524,25 @@ impl InstanceCell { /// cannot be delivered upstream. It is the upstream's responsibility to /// detect and handle crashes. pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) { - if let Err(err) = self.inner.supervision_port.send(event) { - tracing::error!( - "{}: failed to send supervision event to actor: {:?}. Crash the process.", - self.actor_id(), - err - ); - - std::process::exit(1); + match &self.inner.actor_loop { + Some((_, supervision_port)) => { + if let Err(err) = supervision_port.send(event) { + tracing::error!( + "{}: failed to send supervision event to actor: {:?}. Crash the process.", + self.actor_id(), + err + ); + std::process::exit(1); + } + } + None => { + tracing::error!( + "{}: failed: {}: cannot send supervision event to detached actor: crashing", + self.actor_id(), + event, + ); + std::process::exit(1); + } } } @@ -1721,6 +1789,7 @@ mod tests { use crate::HandleClient; use crate::Handler; use crate::OncePortRef; + use crate::PortRef; use crate::clock::RealClock; use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; use crate::test_utils::process_assertion::assert_termination; @@ -2537,6 +2606,55 @@ mod tests { ); } + #[tokio::test] + async fn test_instance() { + #[derive(Debug)] + struct TestActor; + + #[async_trait] + impl Actor for TestActor { + type Params = (); + + async fn new(param: ()) -> Result { + Ok(Self) + } + } + + #[async_trait] + impl Handler<(String, PortRef)> for TestActor { + async fn handle( + &mut self, + cx: &crate::Context, + (message, port): (String, PortRef), + ) -> anyhow::Result<()> { + port.send(cx, message)?; + Ok(()) + } + } + + let proc = Proc::local(); + + let (instance, handle) = proc.instance("my_test_actor").unwrap(); + + let child_actor = TestActor::spawn(&instance, ()).await.unwrap(); + + let (port, mut receiver) = instance.open_port(); + child_actor + .send(("hello".to_string(), port.bind())) + .unwrap(); + + let message = receiver.recv().await.unwrap(); + assert_eq!(message, "hello"); + + child_actor.drain_and_stop().unwrap(); + child_actor.await; + + assert_eq!(*handle.status().borrow(), ActorStatus::Client); + drop(instance); + assert_eq!(*handle.status().borrow(), ActorStatus::Stopped); + handle.await; + } + #[tokio::test] async fn test_proc_terminate_without_coordinator() { if std::env::var("CARGO_TEST").is_ok() { diff --git a/hyperactor/src/supervision.rs b/hyperactor/src/supervision.rs index fbb6922fa..4a1e527b4 100644 --- a/hyperactor/src/supervision.rs +++ b/hyperactor/src/supervision.rs @@ -8,6 +8,7 @@ //! Messages used in supervision. +use std::fmt; use std::fmt::Debug; use derivative::Derivative; @@ -32,3 +33,15 @@ pub struct ActorSupervisionEvent { #[derivative(PartialEq = "ignore")] pub message_headers: Option, } + +impl fmt::Display for ActorSupervisionEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}: {}", self.actor_id, self.actor_status)?; + if let Some(message_headers) = &self.message_headers { + let headers = serde_json::to_string(&message_headers) + .expect("could not serialize message headers"); + write!(f, " headers: {}", headers)?; + } + Ok(()) + } +} From 7e7f89a638f63ce47a93ae7bfdfb21ef2f67767e Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Wed, 6 Aug 2025 08:57:11 -0700 Subject: [PATCH 2/2] simplify supervision propagation; unhandled events always cause failure (#773) Summary: Pull Request resolved: https://github.com/pytorch-labs/monarch/pull/773 Currently (local) supervision can propagate events without also killing an intermediate actor. This is 1) wrong; and 2) complicated. Instead, we treat an unhandled supervision event as an actor failure, and then reduce the propagation paths to one: that of an actor failing. In order to retain accurate attribution, we add a "caused_by" field to the actor supervision events. Reviewed By: shayne-fletcher Differential Revision: D79702385 --- hyperactor/src/actor.rs | 17 +- hyperactor/src/mailbox/undeliverable.rs | 1 + hyperactor/src/proc.rs | 211 +++++++++++++++---- hyperactor/src/supervision.rs | 4 + hyperactor/src/test_utils/proc_supervison.rs | 2 +- hyperactor_mesh/src/proc_mesh.rs | 1 + 6 files changed, 190 insertions(+), 46 deletions(-) diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 34398bf93..c1b2e164b 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -260,8 +260,8 @@ where /// with the ID of the actor being served. #[derive(Debug)] pub struct ActorError { - actor_id: ActorId, - kind: ActorErrorKind, + pub(crate) actor_id: ActorId, + pub(crate) kind: ActorErrorKind, } /// The kinds of actor serving errors. @@ -300,6 +300,10 @@ pub enum ActorErrorKind { #[error("actor is in an indeterminate state")] IndeterminateState, + /// An actor supervision event was not handled. + #[error("supervision: {0}")] + UnhandledSupervisionEvent(#[from] ActorSupervisionEvent), + /// A special kind of error that allows us to clone errors: we can keep the /// error string, but we lose the error structure. #[error("{0}")] @@ -349,6 +353,15 @@ impl From for ActorError { } } +impl From for ActorError { + fn from(inner: ActorSupervisionEvent) -> Self { + Self::new( + inner.actor_id.clone(), + ActorErrorKind::UnhandledSupervisionEvent(inner), + ) + } +} + /// A collection of signals to control the behavior of the actor. /// Signals are internal runtime control plane messages and should not be /// sent outside of the runtime. diff --git a/hyperactor/src/mailbox/undeliverable.rs b/hyperactor/src/mailbox/undeliverable.rs index 3e4a4fcfd..5c1034c67 100644 --- a/hyperactor/src/mailbox/undeliverable.rs +++ b/hyperactor/src/mailbox/undeliverable.rs @@ -164,6 +164,7 @@ pub fn supervise_undeliverable_messages( envelope )), message_headers: Some(envelope.headers().clone()), + caused_by: None, }) .is_err() { diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index bfa72acee..af75d844f 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -964,13 +964,30 @@ impl Instance { let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await; - let actor_status = match result { - Ok(_) => ActorStatus::Stopped, - Err(err) => ActorStatus::Failed(err.to_string()), + let (actor_status, event) = match result { + Ok(_) => (ActorStatus::Stopped, None), + Err(ActorError { + kind: ActorErrorKind::UnhandledSupervisionEvent(event), + .. + }) => (event.actor_status.clone(), Some(event)), + Err(err) => ( + ActorStatus::Failed(err.to_string()), + Some(ActorSupervisionEvent { + actor_id: self.cell.actor_id().clone(), + actor_status: ActorStatus::Failed(err.to_string()), + message_headers: None, + caused_by: None, + }), + ), }; - let result = self.cell.maybe_unlink_parent(); - if let Some(parent) = result { + if let Some(parent) = self.cell.maybe_unlink_parent() { + if let Some(event) = event { + // Parent exists, failure should be propagated to the parent. + parent.send_supervision_event_or_crash(event); + } + // TODO: we should get rid of this signal, and use *only* supervision events for + // the purpose of conveying lifecycle changes if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) { tracing::error!( "{}: failed to send stop message to parent pid {}: {:?}", @@ -979,26 +996,14 @@ impl Instance { err ); } - if actor_status.is_failed() { - // Parent exists, failure should be propagated to the parent. - parent.send_supervision_event_or_crash(ActorSupervisionEvent { - actor_id: self.cell.actor_id().clone(), - actor_status: actor_status.clone(), - message_headers: None, - }); - } } else { // Failure happened to the root actor or orphaned child actors. // In either case, the failure should be propagated to proc. // // Note that orphaned actor is unexpected and would only happen if // there is a bug. - if actor_status.is_failed() { - self.proc.handle_supervision_event(ActorSupervisionEvent { - actor_id: self.cell.actor_id().clone(), - actor_status: actor_status.clone(), - message_headers: None, - }) + if let Some(event) = event { + self.proc.handle_supervision_event(event); } } self.change_status(actor_status); @@ -1103,7 +1108,7 @@ impl Instance { let work = work.expect("inconsistent work queue state"); if let Err(err) = work.handle(actor, self).await { for supervision_event in supervision_event_receiver.drain() { - self.handle_supervision_event(actor, supervision_event).await; + self.handle_supervision_event(actor, supervision_event).await?; } return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err))); } @@ -1122,7 +1127,7 @@ impl Instance { } } Ok(supervision_event) = supervision_event_receiver.recv() => { - self.handle_supervision_event(actor, supervision_event).await; + self.handle_supervision_event(actor, supervision_event).await?; } } self.cell @@ -1154,24 +1159,41 @@ impl Instance { &self, actor: &mut A, supervision_event: ActorSupervisionEvent, - ) { + ) -> Result<(), ActorError> { // Handle the supervision event with the current actor. - if let Ok(false) = actor + match actor .handle_supervision_event(self, &supervision_event) .await { - // The supervision event wasn't handled by this actor, try to bubble it up. - let result = self.cell.get_parent_cell(); - if let Some(parent) = result { - parent.send_supervision_event_or_crash(supervision_event); - } else { - // Reaching here means the actor is either a root actor, or an orphaned - // child actor (i.e. the parent actor was dropped unexpectedly). In either - // case, the supervision event should be sent to proc. - // - // Note that orphaned actor is unexpected and would only happen if there - // is a bug. - self.proc.handle_supervision_event(supervision_event); + Ok(true) => { + // The supervision event was handled by this actor, nothing more to do. + Ok(()) + } + Ok(false) => { + // The supervision event wasn't handled by this actor, chain it and bubble it up. + let supervision_event = ActorSupervisionEvent { + actor_id: self.self_id().clone(), + actor_status: ActorStatus::Failed( + "did not handle supervision event".to_string(), + ), + message_headers: None, + caused_by: Some(Box::new(supervision_event)), + }; + Err(supervision_event.into()) + } + Err(err) => { + // The actor failed to handle the supervision event, it should die. + // Create a new supervision event for this failure and propagate it. + let supervision_event = ActorSupervisionEvent { + actor_id: self.self_id().clone(), + actor_status: ActorStatus::Failed(format!( + "failed to handle supervision event: {}", + err + )), + message_headers: None, + caused_by: Some(Box::new(supervision_event)), + }; + Err(supervision_event.into()) } } } @@ -2174,12 +2196,12 @@ mod tests { // TODO: should we provide finer-grained stop reasons, e.g., to indicate it was // stopped by a parent failure? - assert_matches!(root_2_1.await, ActorStatus::Stopped); - - for actor in [root_1, root] { - // The other actors were unaffected. - assert_matches!(*actor.status().borrow(), ActorStatus::Idle); - } + assert_eq!( + root.await, + ActorStatus::Failed("did not handle supervision event".to_string()) + ); + assert_eq!(root_2_1.await, ActorStatus::Stopped); + assert_eq!(root_1.await, ActorStatus::Stopped); } #[tokio::test] @@ -2602,7 +2624,7 @@ mod tests { assert!(!root_2_1_state.load(Ordering::SeqCst)); assert_eq!( reported_event.event().map(|e| e.actor_id.clone()), - Some(root_2_1.actor_id().clone()) + Some(root.actor_id().clone()) ); } @@ -2615,7 +2637,7 @@ mod tests { impl Actor for TestActor { type Params = (); - async fn new(param: ()) -> Result { + async fn new(_param: ()) -> Result { Ok(Self) } } @@ -2655,6 +2677,109 @@ mod tests { handle.await; } + #[tokio::test] + async fn test_supervision_event_handler_propagates() { + #[derive(Debug)] + struct FailingSupervisionActor; + + #[async_trait] + impl Actor for FailingSupervisionActor { + type Params = (); + + async fn new(_: ()) -> Result { + Ok(Self) + } + + async fn handle_supervision_event( + &mut self, + _this: &Instance, + _event: &ActorSupervisionEvent, + ) -> Result { + anyhow::bail!("failed to handle supervision event!") + } + } + + #[async_trait] + impl Handler for FailingSupervisionActor { + async fn handle( + &mut self, + _cx: &crate::Context, + message: String, + ) -> anyhow::Result<()> { + Err(anyhow::anyhow!(message)) + } + } + + #[derive(Debug)] + struct ParentActor(tokio::sync::mpsc::UnboundedSender); + + #[async_trait] + impl Actor for ParentActor { + type Params = tokio::sync::mpsc::UnboundedSender; + + async fn new( + supervision_events: tokio::sync::mpsc::UnboundedSender, + ) -> Result { + Ok(Self(supervision_events)) + } + + async fn handle_supervision_event( + &mut self, + _this: &Instance, + event: &ActorSupervisionEvent, + ) -> Result { + self.0.send(event.clone()).unwrap(); + Ok(true) + } + } + + let proc = Proc::local(); + + let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); + + let parent = proc.spawn::("parent", event_tx).await.unwrap(); + let child = proc + .spawn_child::(parent.cell().clone(), ()) + .await + .unwrap(); + let grandchild = proc + .spawn_child::(child.cell().clone(), ()) + .await + .unwrap(); + + let child_actor_id = child.actor_id().clone(); + let grandchild_actor_id = grandchild.actor_id().clone(); + + // Grandchild fails, triggering failure up the tree, finally receiving + // the event at the root. + grandchild.send("trigger failure".to_string()).unwrap(); + + assert!(grandchild.await.is_failed()); + assert!(child.await.is_failed()); + + assert_eq!( + event_rx.recv().await.unwrap(), + ActorSupervisionEvent { + actor_id: child_actor_id, + actor_status: ActorStatus::Failed( + "failed to handle supervision event: failed to handle supervision event!" + .to_string() + ), + message_headers: None, + caused_by: Some(Box::new(ActorSupervisionEvent { + actor_id: grandchild_actor_id, + actor_status: ActorStatus::Failed( + "serving local[0].parent[2]: processing error: trigger failure".to_string() + ), + message_headers: None, + caused_by: None, + })), + } + ); + + assert!(event_rx.try_recv().is_err()); + } + #[tokio::test] async fn test_proc_terminate_without_coordinator() { if std::env::var("CARGO_TEST").is_ok() { diff --git a/hyperactor/src/supervision.rs b/hyperactor/src/supervision.rs index 4a1e527b4..dffcab954 100644 --- a/hyperactor/src/supervision.rs +++ b/hyperactor/src/supervision.rs @@ -32,8 +32,12 @@ pub struct ActorSupervisionEvent { /// If this event is associated with a message, the message headers. #[derivative(PartialEq = "ignore")] pub message_headers: Option, + /// Optional supervision event that caused this event, for recursive propagation. + pub caused_by: Option>, } +impl std::error::Error for ActorSupervisionEvent {} + impl fmt::Display for ActorSupervisionEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}: {}", self.actor_id, self.actor_status)?; diff --git a/hyperactor/src/test_utils/proc_supervison.rs b/hyperactor/src/test_utils/proc_supervison.rs index d83da81d2..6e40309c6 100644 --- a/hyperactor/src/test_utils/proc_supervison.rs +++ b/hyperactor/src/test_utils/proc_supervison.rs @@ -45,7 +45,7 @@ impl ProcSupervisionCoordinator { let coordinator = proc .spawn::("coordinator", state.clone()) .await?; - proc.set_supervision_coordinator(coordinator.port::())?; + proc.set_supervision_coordinator(coordinator.port())?; Ok(state) } } diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index 01abb7f1a..dc0e00e36 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -593,6 +593,7 @@ impl ProcEvents { actor_id: proc_id.actor_id("any", 0), actor_status: ActorStatus::Failed(format!("proc {} is stopped", proc_id)), message_headers: None, + caused_by: None, }; if entry.value().send(event).is_err() { tracing::warn!("unable to transmit supervision event to actor {}", entry.key());