diff --git a/Cargo.lock b/Cargo.lock index 69720a9..b81d7fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1042,7 +1042,7 @@ dependencies = [ name = "amico" version = "0.0.1" dependencies = [ - "amico-core 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "amico-core 1.0.2", "amico-mods", "amico-sdk 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "anyhow", @@ -1064,6 +1064,8 @@ dependencies = [ [[package]] name = "amico-core" version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a69124ffec3a019204b9dc5a56449a468d28e0b4b3974f27def0f6a1ea85d325" dependencies = [ "anyhow", "chrono", @@ -1074,14 +1076,11 @@ dependencies = [ "tokio", "tokio_with_wasm", "tracing", - "tracing-subscriber", ] [[package]] name = "amico-core" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a69124ffec3a019204b9dc5a56449a468d28e0b4b3974f27def0f6a1ea85d325" +version = "1.1.0" dependencies = [ "anyhow", "chrono", @@ -1092,6 +1091,7 @@ dependencies = [ "tokio", "tokio_with_wasm", "tracing", + "tracing-subscriber", ] [[package]] @@ -1125,7 +1125,7 @@ name = "amico-mods" version = "0.0.4" dependencies = [ "alloy", - "amico-core 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "amico-core 1.0.2", "amico-hal 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "amico-sdk 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "anyhow", diff --git a/amico-core/Cargo.toml b/amico-core/Cargo.toml index 0abf6b5..7a09fe8 100644 --- a/amico-core/Cargo.toml +++ b/amico-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amico-core" -version = "1.0.2" +version = "1.1.0" edition = "2024" description = "The core Agent components of the Amico AI Agent Framework" repository = "https://github.com/AIMOverse/amico" diff --git a/amico-core/src/agent.rs b/amico-core/src/agent.rs index 0293d74..7a781b8 100644 --- a/amico-core/src/agent.rs +++ b/amico-core/src/agent.rs @@ -3,7 +3,7 @@ use tokio_with_wasm::alias as tokio; use crate::{ traits::{EventSource, Strategy, System}, - types::{AgentEvent, EventContent, Instruction}, + types::{AgentEvent, Control, EventContent}, world::WorldManager, }; @@ -21,10 +21,10 @@ use crate::{ /// - WASM: compatible. pub struct Agent { /// The mpsc channel sender to send agent events to event sources. - event_tx: Sender, + event_tx: Sender, /// The mpsc channel receiver to receive agent events from event sources. - event_rx: Receiver, + event_rx: Receiver, /// The ECS world manager. wm: WorldManager, @@ -67,16 +67,40 @@ impl Agent { // Spawn the thread. let jh = event_source.spawn(move |event| { tracing::debug!("On AgentEvent {:?}", event); - let tx = event_tx.clone(); + let event_tx = event_tx.clone(); async move { let name = event.name; - if let Err(err) = tx.send(event).await { + // Create a new channel for the reply message. + let (tx, mut rx) = channel(1); + + if let Err(err) = event_tx + .send(EventWithTx { + tx: Some(tx), + event, + }) + .await + { tracing::warn!("Failed to send AgentEvent {}", err); } else { tracing::info!("Sent AgentEvent {}", name); } + + // Wait and return the reply message. + rx.recv() + .await + .inspect(|reply| { + if let Some(reply) = reply { + tracing::debug!("Received reply message: {:?}", reply); + } else { + tracing::debug!("Received no reply message"); + } + }) + .unwrap_or_else(|| { + tracing::error!("Failed to receive reply message: channel closed"); + None + }) } }); @@ -93,12 +117,18 @@ impl Agent { } // Send a termination instruction to signal the main loop to exit - let terminate_event = AgentEvent::new("Terminate", "spawn_event_source") - .instruction(Instruction::Terminate); + let terminate_event = + AgentEvent::new("Terminate", "spawn_event_source").control(Control::Quit); // Try to send the termination event, but don't panic if it fails // (channel might already be closed) - if let Err(err) = event_tx.send(terminate_event).await { + if let Err(err) = event_tx + .send(EventWithTx { + tx: None, + event: terminate_event, + }) + .await + { tracing::warn!("Failed to send termination event: {}", err); } }); @@ -117,28 +147,38 @@ impl Agent { /// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy. pub async fn run(&mut self) { // Listen for events sent by event sources. - while let Some(event) = self.event_rx.recv().await { + while let Some(event_with_tx) = self.event_rx.recv().await { + let EventWithTx { tx, event } = event_with_tx; tracing::debug!("Received AgentEvent {:?}", event); - if let Some(EventContent::Instruction(instruction)) = event.content { - // Received an instruction - tracing::debug!("Received instruction {:?}", instruction); - match instruction { + if let Some(EventContent::Control(control)) = event.content { + // Received a control instruction + tracing::debug!("Received control instruction {:?}", control); + match control { // TODO: process other instructions - Instruction::Terminate => { - tracing::info!("Terminating event loop due to Terminate instruction"); + Control::Quit => { + tracing::info!("Terminating event loop due to Quit control instruction"); break; // Exit the event loop immediately } } } else { // The event is not an instruction, dispatch the event to the `World`. - tracing::debug!("Dispatching event {:?}", event); - if let Err(err) = self + tracing::debug!("Processing event {:?}", event); + let reply = self .strategy .deliberate(&event, self.wm.action_sender()) .await - { - tracing::error!("Error dispatching event {:?}: {}", event, err); + .unwrap_or_else(|err| { + // Report the error and return `None` to indicate no reply. + tracing::error!("Error processing event {:?}: {}", event, err); + None + }); + + // Send the reply message back to the event source if needed. + if let Some(tx) = tx { + if let Err(err) = tx.send(reply).await { + tracing::error!("Failed to send reply message: {}", err); + } } } } @@ -155,3 +195,9 @@ pub enum OnFinish { // Stop the Agent workflow when the thread finishes. Stop, } + +/// A struct for the reply message to send back to the event source. +struct EventWithTx { + tx: Option>>, + event: AgentEvent, +} diff --git a/amico-core/src/traits/event_source.rs b/amico-core/src/traits/event_source.rs index 9506614..4d60467 100644 --- a/amico-core/src/traits/event_source.rs +++ b/amico-core/src/traits/event_source.rs @@ -12,5 +12,5 @@ pub trait EventSource { fn spawn(&self, on_event: F) -> JoinHandle> where F: Fn(AgentEvent) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static; + Fut: Future> + Send + 'static; } diff --git a/amico-core/src/traits/strategy.rs b/amico-core/src/traits/strategy.rs index 6ed57d6..7dce929 100644 --- a/amico-core/src/traits/strategy.rs +++ b/amico-core/src/traits/strategy.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use crate::{types::AgentEvent, world::ActionSender}; /// The Agent's action selection strategy. @@ -7,9 +5,23 @@ use crate::{types::AgentEvent, world::ActionSender}; /// Actions for the Agent is one or several ECS events sent to the ECS `World`. pub trait Strategy { /// Responsible for selecting actions based on the `AgentEvent` received. + /// + /// The returned **reply** type `String` should not be confused with **the result** + /// of the deliberation process. The agent may generate a reply message to the user, + /// especially when the `AgentEvent` is a `Interaction`. But in other cases, the + /// agent may not produce a reply message, unless it thinks replying to the user + /// is necessary. + /// + /// The result of the deliberation process is the `Action`s sent to the ECS `World`. + /// + /// # Returns + /// + /// - `Ok(None)`: The strategy does not want to produce a reply message. + /// - `Ok(Some(reply))`: The strategy wants to produce a reply message. + /// - `Err(e)`: The strategy is unable to process the `AgentEvent`. fn deliberate( &mut self, agent_event: &AgentEvent, sender: ActionSender, - ) -> impl Future>; + ) -> impl Future>>; } diff --git a/amico-core/src/types/agent_event.rs b/amico-core/src/types/agent_event.rs index 84f1266..156725b 100644 --- a/amico-core/src/types/agent_event.rs +++ b/amico-core/src/types/agent_event.rs @@ -6,7 +6,7 @@ use serde_json::Value; use crate::errors::AgentEventError; -use super::Instruction; +use super::{Control, Interaction}; /// Struct representing an event the agent receives. /// @@ -32,12 +32,12 @@ use super::Instruction; /// ## Create an event with instruction /// /// ``` -/// use amico_core::types::{AgentEvent, Instruction, EventContent}; +/// use amico_core::types::{AgentEvent, Control, EventContent}; /// /// let event = AgentEvent::new("test", "TestSource") -/// .instruction(Instruction::Terminate); +/// .control(Control::Quit); /// -/// assert_eq!(event.content, Some(EventContent::Instruction(Instruction::Terminate))); +/// assert_eq!(event.content, Some(EventContent::Control(Control::Quit))); /// ``` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AgentEvent { @@ -62,8 +62,14 @@ pub struct AgentEvent { /// Either some content value, or an instruction for the agent. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum EventContent { + /// The serialized content data of the event. Content(Value), - Instruction(Instruction), + + /// A control instruction to the agent. + Control(Control), + + /// An interaction with the agent. + Interaction(Interaction), } impl AgentEvent { @@ -141,23 +147,23 @@ impl AgentEvent { } } - /// Adds an instruction to the event. + /// Adds a control instruction to the event. /// - /// Setting `instruction` will override any existing instruction or content. + /// Setting `control` will override any existing control or content. /// /// # Examples /// /// ``` - /// use amico_core::types::{AgentEvent, Instruction, EventContent}; + /// use amico_core::types::{AgentEvent, Control, EventContent}; /// /// let event = AgentEvent::new("test", "TestSource") - /// .instruction(Instruction::Terminate); + /// .control(Control::Quit); /// - /// assert_eq!(event.content, Some(EventContent::Instruction(Instruction::Terminate))); + /// assert_eq!(event.content, Some(EventContent::Control(Control::Quit))); /// ``` - pub fn instruction(self, instruction: Instruction) -> Self { + pub fn control(self, instruction: Control) -> Self { Self { - content: Some(EventContent::Instruction(instruction)), + content: Some(EventContent::Control(instruction)), ..self } } @@ -242,6 +248,55 @@ impl AgentEvent { None => Err(AgentEventError::ContentError("Content is None")), } } + + /// Adds an interaction to the event. + /// + /// Setting `interaction` will override any existing interaction, control, or content. + /// + /// # Examples + /// + /// ``` + /// use amico_core::types::{AgentEvent, Interaction, Chat, EventContent}; + /// + /// let event = AgentEvent::new("test", "TestSource") + /// .interaction(Chat::new().into_interaction()); + /// + /// assert_eq!(event.content, Some(EventContent::Interaction(Chat::new().into_interaction()))); + /// ``` + pub fn interaction(self, interaction: Interaction) -> Self { + Self { + content: Some(EventContent::Interaction(interaction)), + ..self + } + } + + /// Gets the interaction from the event. Returns `None` if the event does not contain an interaction. + /// + /// # Examples + /// + /// ``` + /// use amico_core::types::{AgentEvent, Interaction, Chat, EventContent, Control}; + /// + /// let event = AgentEvent::new("test", "TestSource") + /// .interaction(Chat::new().into_interaction()); + /// + /// let interaction = event.get_interaction(); + /// + /// assert_eq!(interaction, Some(&Chat::new().into_interaction())); + /// + /// let event = AgentEvent::new("test", "TestSource") + /// .control(Control::Quit); + /// + /// let interaction = event.get_interaction(); + /// + /// assert_eq!(interaction, None); + /// ``` + pub fn get_interaction(&self) -> Option<&Interaction> { + match &self.content { + Some(EventContent::Interaction(interaction)) => Some(interaction), + _ => None, + } + } } #[cfg(test)] diff --git a/amico-core/src/types/instruction.rs b/amico-core/src/types/control.rs similarity index 52% rename from amico-core/src/types/instruction.rs rename to amico-core/src/types/control.rs index ab8942e..f0bb9de 100644 --- a/amico-core/src/types/instruction.rs +++ b/amico-core/src/types/control.rs @@ -1,10 +1,10 @@ use serde::{Deserialize, Serialize}; -/// An instruction to the Agent, e.g. quit. +/// A control instruction to the Agent, e.g. quit. /// /// TODO: Define more instructions. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum Instruction { - /// Signal to terminate the agent event loop - Terminate, +pub enum Control { + /// Signal to quit the agent event loop + Quit, } diff --git a/amico-core/src/types/interaction.rs b/amico-core/src/types/interaction.rs new file mode 100644 index 0000000..08bb341 --- /dev/null +++ b/amico-core/src/types/interaction.rs @@ -0,0 +1,77 @@ +use serde::{Deserialize, Serialize}; + +/// An interaction with the Agent, e.g. a chat request. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum Interaction { + /// A chat interaction. + Chat(Chat), +} + +/// The session ID type. +pub type SessionId = u64; + +/// A chat interaction context data. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct Chat { + pub messages: Vec, + pub session_id: SessionId, +} + +/// A interaction chat message. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ChatMessage { + /// The message content. + pub content: String, + + /// The message role. + pub role: String, +} + +impl Chat { + /// Creates a new chat interaction. + pub fn new() -> Self { + Self { + messages: vec![], + session_id: 0, + } + } + + /// Sets the messages of the chat interaction. + pub fn messages(self, messages: Vec) -> Self { + Self { messages, ..self } + } + + /// Sets the session ID of the chat interaction. + pub fn session_id(self, session_id: SessionId) -> Self { + Self { session_id, ..self } + } + + /// Converts the chat interaction into an interaction. + pub fn into_interaction(self) -> Interaction { + Interaction::Chat(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_chat() { + let chat = Chat::new() + .messages(vec![ChatMessage { + content: "Hello, world!".to_string(), + role: "user".to_string(), + }]) + .session_id(1); + + assert_eq!( + chat.messages, + vec![ChatMessage { + content: "Hello, world!".to_string(), + role: "user".to_string(), + }] + ); + assert_eq!(chat.session_id, 1); + } +} diff --git a/amico-core/src/types/mod.rs b/amico-core/src/types/mod.rs index d9a0081..d3dc85f 100644 --- a/amico-core/src/types/mod.rs +++ b/amico-core/src/types/mod.rs @@ -1,5 +1,7 @@ mod agent_event; -mod instruction; +mod control; +mod interaction; pub use agent_event::*; -pub use instruction::*; +pub use control::*; +pub use interaction::*; diff --git a/amico-core/tests/strategy_interaction.rs b/amico-core/tests/strategy_interaction.rs new file mode 100644 index 0000000..ce5ede0 --- /dev/null +++ b/amico-core/tests/strategy_interaction.rs @@ -0,0 +1,81 @@ +use std::{future::Future, time::Duration}; + +use amico_core::{ + Agent, OnFinish, ecs, + traits::{EventSource, Strategy, System}, + types::{AgentEvent, Chat, Interaction}, +}; +use tokio::{task::JoinHandle, time::sleep}; +use tokio_with_wasm::alias as tokio; + +struct InteractionSource; + +impl EventSource for InteractionSource { + fn spawn(&self, on_event: F) -> JoinHandle> + where + F: Fn(AgentEvent) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + tokio::spawn(async move { + for i in 1..10 { + let event = AgentEvent::new("Chat", "InteractionSource") + .interaction(Chat::new().session_id(i).into_interaction()); + + let result = on_event(event).await.unwrap(); + + println!("Got reply: {}", result); + + // Simulate an asynchronous process. + sleep(Duration::from_millis(30 + i * 5)).await; + } + + Ok(()) + }) + } +} + +struct InteractionStrategy; + +impl Strategy for InteractionStrategy { + async fn deliberate( + &mut self, + agent_event: &AgentEvent, + mut sender: amico_core::world::ActionSender<'_>, + ) -> anyhow::Result> { + let interaction = agent_event.get_interaction().unwrap(); + + match interaction { + Interaction::Chat(chat) => { + sender.send(Log(format!("Received chat interaction: {:?}", chat))); + + // Simulate an asynchronous reply generation process. + sleep(Duration::from_millis(80)).await; + Ok(Some(format!("Session ID: {}", chat.session_id))) + } // _ => {} + } + } +} + +struct LogSystem; + +#[derive(amico_core::ecs::GlobalEvent)] +struct Log(String); + +impl System for LogSystem { + fn register_to(self, mut registry: amico_core::world::HandlerRegistry) { + registry.register(|r: ecs::Receiver| { + println!("log: {}", r.event.0); + }); + } +} + +#[tokio::test] +async fn test_agent() { + // tracing_subscriber::fmt::init(); + + let mut agent = Agent::new(InteractionStrategy); + agent.spawn_event_source(InteractionSource, OnFinish::Stop); + agent.add_system(LogSystem); + + agent.run().await; +} diff --git a/amico-core/tests/test_agent.rs b/amico-core/tests/test_agent.rs index 5481b57..5e2da03 100644 --- a/amico-core/tests/test_agent.rs +++ b/amico-core/tests/test_agent.rs @@ -21,7 +21,7 @@ impl EventSource for TestEventSource { fn spawn(&self, on_event: F) -> JoinHandle> where F: Fn(AgentEvent) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, + Fut: Future> + Send + 'static, { tokio::spawn(async move { for i in 1..10 { @@ -50,13 +50,13 @@ impl Strategy for TestStrategy { &mut self, agent_event: &AgentEvent, mut sender: amico_core::world::ActionSender<'_>, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let EventInner { value, .. } = agent_event.parse_content::()?; sleep(Duration::from_millis(80)).await; sender.send(Tick(value)); - Ok(()) + Ok(None) } } @@ -75,7 +75,7 @@ impl System for TestSystem { #[tokio::test] async fn test_agent() { - tracing_subscriber::fmt::init(); + // tracing_subscriber::fmt::init(); let mut agent = Agent::new(TestStrategy); agent.spawn_event_source(TestEventSource, OnFinish::Stop); diff --git a/amico-wasm/Cargo.toml b/amico-wasm/Cargo.toml index 9894d0a..cb64a6c 100644 --- a/amico-wasm/Cargo.toml +++ b/amico-wasm/Cargo.toml @@ -14,7 +14,7 @@ default = ["console_error_panic_hook"] [dependencies] wasm-bindgen = "0.2.100" -amico-core = { path = "../amico-core" } +amico-core = { workspace = true } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires