Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion amico-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amico-core"
version = "1.0.2"
version = "1.1.0"
edition = "2024"
description = "The core Agent components of the Amico AI Agent Framework"
repository = "https://github.com/AIMOverse/amico"
Expand Down
84 changes: 65 additions & 19 deletions amico-core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tokio_with_wasm::alias as tokio;

use crate::{
traits::{EventSource, Strategy, System},
types::{AgentEvent, EventContent, Instruction},
types::{AgentEvent, Control, EventContent},
world::WorldManager,
};

Expand All @@ -21,10 +21,10 @@ use crate::{
/// - WASM: compatible.
pub struct Agent<S: Strategy> {
/// The mpsc channel sender to send agent events to event sources.
event_tx: Sender<AgentEvent>,
event_tx: Sender<EventWithTx>,

/// The mpsc channel receiver to receive agent events from event sources.
event_rx: Receiver<AgentEvent>,
event_rx: Receiver<EventWithTx>,

/// The ECS world manager.
wm: WorldManager,
Expand Down Expand Up @@ -67,16 +67,40 @@ impl<S: Strategy> Agent<S> {
// Spawn the thread.
let jh = event_source.spawn(move |event| {
tracing::debug!("On AgentEvent {:?}", event);
let tx = event_tx.clone();
let event_tx = event_tx.clone();

async move {
let name = event.name;

if let Err(err) = tx.send(event).await {
// Create a new channel for the reply message.
let (tx, mut rx) = channel(1);

if let Err(err) = event_tx
.send(EventWithTx {
tx: Some(tx),
event,
})
.await
{
tracing::warn!("Failed to send AgentEvent {}", err);
} else {
tracing::info!("Sent AgentEvent {}", name);
}

// Wait and return the reply message.
rx.recv()
.await
.inspect(|reply| {
if let Some(reply) = reply {
tracing::debug!("Received reply message: {:?}", reply);
} else {
tracing::debug!("Received no reply message");
}
})
.unwrap_or_else(|| {
tracing::error!("Failed to receive reply message: channel closed");
None
})
}
});

Expand All @@ -93,12 +117,18 @@ impl<S: Strategy> Agent<S> {
}

// Send a termination instruction to signal the main loop to exit
let terminate_event = AgentEvent::new("Terminate", "spawn_event_source")
.instruction(Instruction::Terminate);
let terminate_event =
AgentEvent::new("Terminate", "spawn_event_source").control(Control::Quit);

// Try to send the termination event, but don't panic if it fails
// (channel might already be closed)
if let Err(err) = event_tx.send(terminate_event).await {
if let Err(err) = event_tx
.send(EventWithTx {
tx: None,
event: terminate_event,
})
.await
{
tracing::warn!("Failed to send termination event: {}", err);
}
});
Expand All @@ -117,28 +147,38 @@ impl<S: Strategy> Agent<S> {
/// `run` dispatches `AgentEvent`s into the ECS `World` based on the Agent's strategy.
pub async fn run(&mut self) {
// Listen for events sent by event sources.
while let Some(event) = self.event_rx.recv().await {
while let Some(event_with_tx) = self.event_rx.recv().await {
let EventWithTx { tx, event } = event_with_tx;
tracing::debug!("Received AgentEvent {:?}", event);

if let Some(EventContent::Instruction(instruction)) = event.content {
// Received an instruction
tracing::debug!("Received instruction {:?}", instruction);
match instruction {
if let Some(EventContent::Control(control)) = event.content {
// Received a control instruction
tracing::debug!("Received control instruction {:?}", control);
match control {
// TODO: process other instructions
Instruction::Terminate => {
tracing::info!("Terminating event loop due to Terminate instruction");
Control::Quit => {
tracing::info!("Terminating event loop due to Quit control instruction");
break; // Exit the event loop immediately
}
}
} else {
// The event is not an instruction, dispatch the event to the `World`.
tracing::debug!("Dispatching event {:?}", event);
if let Err(err) = self
tracing::debug!("Processing event {:?}", event);
let reply = self
.strategy
.deliberate(&event, self.wm.action_sender())
.await
{
tracing::error!("Error dispatching event {:?}: {}", event, err);
.unwrap_or_else(|err| {
// Report the error and return `None` to indicate no reply.
tracing::error!("Error processing event {:?}: {}", event, err);
None
});

// Send the reply message back to the event source if needed.
if let Some(tx) = tx {
if let Err(err) = tx.send(reply).await {
tracing::error!("Failed to send reply message: {}", err);
}
}
}
}
Expand All @@ -155,3 +195,9 @@ pub enum OnFinish {
// Stop the Agent workflow when the thread finishes.
Stop,
}

/// A struct for the reply message to send back to the event source.
struct EventWithTx {
tx: Option<Sender<Option<String>>>,
event: AgentEvent,
}
2 changes: 1 addition & 1 deletion amico-core/src/traits/event_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ pub trait EventSource {
fn spawn<F, Fut>(&self, on_event: F) -> JoinHandle<anyhow::Result<()>>
where
F: Fn(AgentEvent) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static;
Fut: Future<Output = Option<String>> + Send + 'static;
}
18 changes: 15 additions & 3 deletions amico-core/src/traits/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
use std::future::Future;

use crate::{types::AgentEvent, world::ActionSender};

/// The Agent's action selection strategy.
///
/// Actions for the Agent is one or several ECS events sent to the ECS `World`.
pub trait Strategy {
/// Responsible for selecting actions based on the `AgentEvent` received.
///
/// The returned **reply** type `String` should not be confused with **the result**
/// of the deliberation process. The agent may generate a reply message to the user,
/// especially when the `AgentEvent` is a `Interaction`. But in other cases, the
/// agent may not produce a reply message, unless it thinks replying to the user
/// is necessary.
///
/// The result of the deliberation process is the `Action`s sent to the ECS `World`.
///
/// # Returns
///
/// - `Ok(None)`: The strategy does not want to produce a reply message.
/// - `Ok(Some(reply))`: The strategy wants to produce a reply message.
/// - `Err(e)`: The strategy is unable to process the `AgentEvent`.
fn deliberate(
&mut self,
agent_event: &AgentEvent,
sender: ActionSender,
) -> impl Future<Output = anyhow::Result<()>>;
) -> impl Future<Output = anyhow::Result<Option<String>>>;
}
79 changes: 67 additions & 12 deletions amico-core/src/types/agent_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde_json::Value;

use crate::errors::AgentEventError;

use super::Instruction;
use super::{Control, Interaction};

/// Struct representing an event the agent receives.
///
Expand All @@ -32,12 +32,12 @@ use super::Instruction;
/// ## Create an event with instruction
///
/// ```
/// use amico_core::types::{AgentEvent, Instruction, EventContent};
/// use amico_core::types::{AgentEvent, Control, EventContent};
///
/// let event = AgentEvent::new("test", "TestSource")
/// .instruction(Instruction::Terminate);
/// .control(Control::Quit);
///
/// assert_eq!(event.content, Some(EventContent::Instruction(Instruction::Terminate)));
/// assert_eq!(event.content, Some(EventContent::Control(Control::Quit)));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentEvent {
Expand All @@ -62,8 +62,14 @@ pub struct AgentEvent {
/// Either some content value, or an instruction for the agent.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum EventContent {
/// The serialized content data of the event.
Content(Value),
Instruction(Instruction),

/// A control instruction to the agent.
Control(Control),

/// An interaction with the agent.
Interaction(Interaction),
}

impl AgentEvent {
Expand Down Expand Up @@ -141,23 +147,23 @@ impl AgentEvent {
}
}

/// Adds an instruction to the event.
/// Adds a control instruction to the event.
///
/// Setting `instruction` will override any existing instruction or content.
/// Setting `control` will override any existing control or content.
///
/// # Examples
///
/// ```
/// use amico_core::types::{AgentEvent, Instruction, EventContent};
/// use amico_core::types::{AgentEvent, Control, EventContent};
///
/// let event = AgentEvent::new("test", "TestSource")
/// .instruction(Instruction::Terminate);
/// .control(Control::Quit);
///
/// assert_eq!(event.content, Some(EventContent::Instruction(Instruction::Terminate)));
/// assert_eq!(event.content, Some(EventContent::Control(Control::Quit)));
/// ```
pub fn instruction(self, instruction: Instruction) -> Self {
pub fn control(self, instruction: Control) -> Self {
Self {
content: Some(EventContent::Instruction(instruction)),
content: Some(EventContent::Control(instruction)),
..self
}
}
Expand Down Expand Up @@ -242,6 +248,55 @@ impl AgentEvent {
None => Err(AgentEventError::ContentError("Content is None")),
}
}

/// Adds an interaction to the event.
///
/// Setting `interaction` will override any existing interaction, control, or content.
///
/// # Examples
///
/// ```
/// use amico_core::types::{AgentEvent, Interaction, Chat, EventContent};
///
/// let event = AgentEvent::new("test", "TestSource")
/// .interaction(Chat::new().into_interaction());
///
/// assert_eq!(event.content, Some(EventContent::Interaction(Chat::new().into_interaction())));
/// ```
pub fn interaction(self, interaction: Interaction) -> Self {
Self {
content: Some(EventContent::Interaction(interaction)),
..self
}
}

/// Gets the interaction from the event. Returns `None` if the event does not contain an interaction.
///
/// # Examples
///
/// ```
/// use amico_core::types::{AgentEvent, Interaction, Chat, EventContent, Control};
///
/// let event = AgentEvent::new("test", "TestSource")
/// .interaction(Chat::new().into_interaction());
///
/// let interaction = event.get_interaction();
///
/// assert_eq!(interaction, Some(&Chat::new().into_interaction()));
///
/// let event = AgentEvent::new("test", "TestSource")
/// .control(Control::Quit);
///
/// let interaction = event.get_interaction();
///
/// assert_eq!(interaction, None);
/// ```
pub fn get_interaction(&self) -> Option<&Interaction> {
match &self.content {
Some(EventContent::Interaction(interaction)) => Some(interaction),
_ => None,
}
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde::{Deserialize, Serialize};

/// An instruction to the Agent, e.g. quit.
/// A control instruction to the Agent, e.g. quit.
///
/// TODO: Define more instructions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Instruction {
/// Signal to terminate the agent event loop
Terminate,
pub enum Control {
/// Signal to quit the agent event loop
Quit,
}
Loading