From 756116d6ebd130894d52495d373b6c9d5cd37c00 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Wed, 6 Aug 2025 08:08:45 -0700 Subject: [PATCH] "inverted control" mode through Proc::instance (#772) Summary: Adds Proc::instance() which returns an actor instance and its corresponding handler. This allows the user to create a regular actor without any message handlers. The returned `Instance` provides all the normal capabilities, including sending and receiving messages, being able to spawn and manage child actors, etc. This is the foundation for a kind of "script mode" actor. Reviewed By: shayne-fletcher Differential Revision: D79685752 --- hyperactor/src/actor.rs | 23 +++- hyperactor/src/proc.rs | 216 ++++++++++++++++++++++++++-------- hyperactor/src/supervision.rs | 13 ++ 3 files changed, 202 insertions(+), 50 deletions(-) diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 0b046fcef..34398bf93 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -135,6 +135,17 @@ pub trait Actor: Sized + Send + Debug + 'static { } } +/// An actor that does nothing. It is used to represent "client only" actors, +/// returned by [`Proc::instance`]. +#[async_trait] +impl Actor for () { + type Params = (); + + async fn new(params: Self::Params) -> Result { + Ok(params) + } +} + /// A Handler allows an actor to handle a specific message type. #[async_trait] pub trait Handler: Actor { @@ -356,6 +367,16 @@ pub enum Signal { ChildStopped(Index), } +impl fmt::Display for Signal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Signal::DrainAndStop => write!(f, "DrainAndStop"), + Signal::Stop => write!(f, "Stop"), + Signal::ChildStopped(index) => write!(f, "ChildStopped({})", index), + } + } +} + /// The runtime status of an actor. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)] pub enum ActorStatus { @@ -390,7 +411,7 @@ pub enum ActorStatus { impl ActorStatus { /// Tells whether the status is a terminal state. - fn is_terminal(&self) -> bool { + pub(crate) fn is_terminal(&self) -> bool { matches!(self, Self::Stopped | Self::Failed(_)) } diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 2e58a1f00..bfa72acee 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -460,7 +460,7 @@ impl Proc { actor_type = std::any::type_name::(), actor_id = actor_id.to_string(), ); - let instance = Instance::new(self.clone(), actor_id.clone(), None); + let instance = Instance::new(self.clone(), actor_id.clone(), false, None); let actor = A::new(params).await?; // Add this actor to the proc's actor ledger. We do not actively remove // inactive actors from ledger, because the actor's state can be inferred @@ -472,6 +472,28 @@ impl Proc { instance.start(actor).await } + /// Create and return an actor instance and its corresponding handle. This allows actors to be + /// "inverted": the caller can use the returned [`Instance`] to send and receive messages, + /// launch child actors, etc. The actor itself does not handle any messages, and supervision events + /// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be + /// referenced and stopped. + pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> { + let actor_id = self.allocate_root_id(name)?; + let _ = tracing::debug_span!( + "actor_instance", + actor_name = name, + actor_type = std::any::type_name::<()>(), + actor_id = actor_id.to_string(), + ); + + let instance = Instance::new(self.clone(), actor_id.clone(), true, None); + let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone()); + + instance.change_status(ActorStatus::Client); + + Ok((instance, handle)) + } + /// Spawn a child actor from the provided parent on this proc. The parent actor /// must already belong to this proc, a fact which is asserted in code. /// @@ -483,7 +505,7 @@ impl Proc { params: A::Params, ) -> Result, anyhow::Error> { let actor_id = self.allocate_child_id(parent.actor_id())?; - let instance = Instance::new(self.clone(), actor_id, Some(parent.clone())); + let instance = Instance::new(self.clone(), actor_id, false, Some(parent.clone())); let actor = A::new(params).await?; instance.start(actor).await } @@ -773,13 +795,8 @@ pub struct Instance { /// The mailbox associated with the actor. mailbox: Mailbox, - /// The actor's signal receiver. This is used to - /// receive signals sent to the actor. - signal_receiver: PortReceiver, - - /// The actor's supervision event receiver. This is used to receive supervision - /// event from the parent. - supervision_event_receiver: PortReceiver, + /// Receivers for the actor loop, if available. + actor_loop_receivers: Option<(PortReceiver, PortReceiver)>, ports: Arc>, @@ -797,19 +814,17 @@ pub struct Instance { impl Instance { /// Create a new actor instance in Created state. - pub(crate) fn new(proc: Proc, actor_id: ActorId, parent: Option) -> Self { + pub(crate) fn new( + proc: Proc, + actor_id: ActorId, + detached: bool, + parent: Option, + ) -> Self { // Set up messaging let mailbox = Mailbox::new(actor_id.clone(), BoxedMailboxSender::new(proc.downgrade())); let (work_tx, work_rx) = mpsc::unbounded_channel(); - let ports: Arc> = Arc::new(Ports::new(mailbox.clone(), work_tx)); - let (signal_port, signal_receiver) = ports.open_message_port().unwrap(); - proc.state().proc_muxer.bind_mailbox(mailbox.clone()); - - // Supervision event port is used locally in the proc - let (supervision_port, supervision_event_receiver) = mailbox.open_port(); - let (status_tx, status_rx) = watch::channel(ActorStatus::Created); let actor_type = match TypeInfo::of::() { @@ -818,12 +833,24 @@ impl Instance { }; let ais = actor_id.to_string(); + let actor_loop_ports = if detached { + None + } else { + let (signal_port, signal_receiver) = ports.open_message_port().unwrap(); + let (supervision_port, supervision_receiver) = mailbox.open_port(); + Some(( + (signal_port, supervision_port), + (signal_receiver, supervision_receiver), + )) + }; + + let (actor_loop, actor_loop_receivers) = actor_loop_ports.unzip(); + let cell = InstanceCell::new( actor_id, actor_type, proc.clone(), - signal_port, - supervision_port, + actor_loop, status_rx, parent, ports.clone(), @@ -834,8 +861,7 @@ impl Instance { proc, cell, mailbox, - signal_receiver, - supervision_event_receiver, + actor_loop_receivers, ports, work_rx, status_tx, @@ -934,7 +960,9 @@ impl Instance { } async fn serve(mut self, mut actor: A) { - let result = self.run_actor_tree(&mut actor).await; + let actor_loop_receivers = self.actor_loop_receivers.take().unwrap(); + + let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await; let actor_status = match result { Ok(_) => ActorStatus::Stopped, @@ -978,13 +1006,20 @@ impl Instance { /// Runs the actor, and manages its supervision tree. When the function returns, /// the whole tree rooted at this actor has stopped. - async fn run_actor_tree(&mut self, actor: &mut A) -> Result<(), ActorError> { + async fn run_actor_tree( + &mut self, + actor: &mut A, + mut actor_loop_receivers: (PortReceiver, PortReceiver), + ) -> Result<(), ActorError> { // It is okay to catch all panics here, because we are in a tokio task, // and tokio will catch the panic anyway: // https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html#method.is_panic // What we do here is just to catch it early so we can handle it. - let result = match AssertUnwindSafe(self.run(actor)).catch_unwind().await { + let result = match AssertUnwindSafe(self.run(actor, &mut actor_loop_receivers)) + .catch_unwind() + .await + { Ok(result) => result, Err(err) => { // This is only the error message. Backtrace is not included. @@ -1027,8 +1062,9 @@ impl Instance { self.cell.unlink(&child); } + let (mut signal_receiver, _) = actor_loop_receivers; while self.cell.child_count() > 0 { - match self.signal_receiver.recv().await? { + match signal_receiver.recv().await? { Signal::ChildStopped(pid) => { assert!(self.cell.get_child(pid).is_none()); } @@ -1040,9 +1076,15 @@ impl Instance { } /// Initialize and run the actor until it fails or is stopped. - async fn run(&mut self, actor: &mut A) -> Result<(), ActorError> { + async fn run( + &mut self, + actor: &mut A, + actor_loop_receivers: &mut (PortReceiver, PortReceiver), + ) -> Result<(), ActorError> { tracing::debug!("entering actor loop: {}", self.self_id()); + let (signal_receiver, supervision_event_receiver) = actor_loop_receivers; + self.change_status(ActorStatus::Initializing); actor .init(self) @@ -1060,13 +1102,13 @@ impl Instance { let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs); let work = work.expect("inconsistent work queue state"); if let Err(err) = work.handle(actor, self).await { - for supervision_event in self.supervision_event_receiver.drain() { + for supervision_event in supervision_event_receiver.drain() { self.handle_supervision_event(actor, supervision_event).await; } return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err))); } } - signal = self.signal_receiver.recv() => { + signal = signal_receiver.recv() => { let signal = signal.map_err(ActorError::from); tracing::debug!("Received signal {signal:?}"); match signal? { @@ -1079,7 +1121,7 @@ impl Instance { }, } } - Ok(supervision_event) = self.supervision_event_receiver.recv() => { + Ok(supervision_event) = supervision_event_receiver.recv() => { self.handle_supervision_event(actor, supervision_event).await; } } @@ -1204,6 +1246,19 @@ impl Instance { } } +impl Drop for Instance { + fn drop(&mut self) { + self.status_tx.send_if_modified(|status| { + if status.is_terminal() { + false + } else { + *status = ActorStatus::Stopped; + true + } + }); + } +} + impl cap::sealed::CanSend for Instance { fn post(&self, dest: PortId, headers: Attrs, data: Serialized) { let envelope = MessageEnvelope::new(self.self_id().clone(), dest, data, headers); @@ -1339,13 +1394,8 @@ struct InstanceState { /// The proc in which the actor is running. proc: Proc, - /// The actor's signal port. This is used to send - /// signals to the actor. - signal: PortHandle, - - /// The actor's supervision port. This is used to send - /// supervision event to the actor (usually by its children). - supervision_port: PortHandle, + /// Control port handles to the actor loop, if one is running. + actor_loop: Option<(PortHandle, PortHandle)>, /// An observer that stores the current status of the actor. status: watch::Receiver, @@ -1397,8 +1447,7 @@ impl InstanceCell { actor_id: ActorId, actor_type: ActorType, proc: Proc, - signal: PortHandle, - supervision_port: PortHandle, + actor_loop: Option<(PortHandle, PortHandle)>, status: watch::Receiver, parent: Option, ports: Arc, @@ -1409,8 +1458,7 @@ impl InstanceCell { actor_id: actor_id.clone(), actor_type, proc: proc.clone(), - signal, - supervision_port, + actor_loop, status, parent: parent.map_or_else(WeakInstanceCell::new, |cell| cell.downgrade()), children: DashMap::new(), @@ -1455,7 +1503,16 @@ impl InstanceCell { /// Send a signal to the actor. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`. pub fn signal(&self, signal: Signal) -> Result<(), ActorError> { - self.inner.signal.send(signal).map_err(ActorError::from) + if let Some((signal_port, _)) = &self.inner.actor_loop { + signal_port.send(signal).map_err(ActorError::from) + } else { + tracing::warn!( + "{}: attempted to send signal {} to detached actor", + self.inner.actor_id, + signal + ); + Ok(()) + } } /// Used by this actor's children to send a supervision event to this actor. @@ -1467,14 +1524,25 @@ impl InstanceCell { /// cannot be delivered upstream. It is the upstream's responsibility to /// detect and handle crashes. pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) { - if let Err(err) = self.inner.supervision_port.send(event) { - tracing::error!( - "{}: failed to send supervision event to actor: {:?}. Crash the process.", - self.actor_id(), - err - ); - - std::process::exit(1); + match &self.inner.actor_loop { + Some((_, supervision_port)) => { + if let Err(err) = supervision_port.send(event) { + tracing::error!( + "{}: failed to send supervision event to actor: {:?}. Crash the process.", + self.actor_id(), + err + ); + std::process::exit(1); + } + } + None => { + tracing::error!( + "{}: failed: {}: cannot send supervision event to detached actor: crashing", + self.actor_id(), + event, + ); + std::process::exit(1); + } } } @@ -1721,6 +1789,7 @@ mod tests { use crate::HandleClient; use crate::Handler; use crate::OncePortRef; + use crate::PortRef; use crate::clock::RealClock; use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; use crate::test_utils::process_assertion::assert_termination; @@ -2537,6 +2606,55 @@ mod tests { ); } + #[tokio::test] + async fn test_instance() { + #[derive(Debug)] + struct TestActor; + + #[async_trait] + impl Actor for TestActor { + type Params = (); + + async fn new(param: ()) -> Result { + Ok(Self) + } + } + + #[async_trait] + impl Handler<(String, PortRef)> for TestActor { + async fn handle( + &mut self, + cx: &crate::Context, + (message, port): (String, PortRef), + ) -> anyhow::Result<()> { + port.send(cx, message)?; + Ok(()) + } + } + + let proc = Proc::local(); + + let (instance, handle) = proc.instance("my_test_actor").unwrap(); + + let child_actor = TestActor::spawn(&instance, ()).await.unwrap(); + + let (port, mut receiver) = instance.open_port(); + child_actor + .send(("hello".to_string(), port.bind())) + .unwrap(); + + let message = receiver.recv().await.unwrap(); + assert_eq!(message, "hello"); + + child_actor.drain_and_stop().unwrap(); + child_actor.await; + + assert_eq!(*handle.status().borrow(), ActorStatus::Client); + drop(instance); + assert_eq!(*handle.status().borrow(), ActorStatus::Stopped); + handle.await; + } + #[tokio::test] async fn test_proc_terminate_without_coordinator() { if std::env::var("CARGO_TEST").is_ok() { diff --git a/hyperactor/src/supervision.rs b/hyperactor/src/supervision.rs index fbb6922fa..4a1e527b4 100644 --- a/hyperactor/src/supervision.rs +++ b/hyperactor/src/supervision.rs @@ -8,6 +8,7 @@ //! Messages used in supervision. +use std::fmt; use std::fmt::Debug; use derivative::Derivative; @@ -32,3 +33,15 @@ pub struct ActorSupervisionEvent { #[derivative(PartialEq = "ignore")] pub message_headers: Option, } + +impl fmt::Display for ActorSupervisionEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}: {}", self.actor_id, self.actor_status)?; + if let Some(message_headers) = &self.message_headers { + let headers = serde_json::to_string(&message_headers) + .expect("could not serialize message headers"); + write!(f, " headers: {}", headers)?; + } + Ok(()) + } +}