Skip to content

(future-work) Making radkit agents fault tolarent. #33

@irshadnilam

Description

@irshadnilam

Durable Agent State Machines

Goal: make agents fault tolerant and resumable (restart-safe, lambda-safe) while keeping the A2A surface (tasks/messages/artifacts/status/streaming) intact. Internals become a durable state machine with explicit steps, commands, and receipts.

Why change

  • Current skills are single-shot; only input-required pauses are supported.
  • Long-running/fragile steps can crash or be retried without state.
  • We want deterministic replay, idempotent side effects, and mid-flow human input anywhere.

Core model

  • State: serializable data that represents “where the task is” plus any payload/history the next step needs.
  • Events: things that drive the state machine (user message, tool result, LLM result, timer/backoff fired).
  • Commands: side-effect intents emitted by a transition (call LLM, call tool, ask user, emit artifact/message, set timer). Each has an InvocationId and effect policy (idempotent/at-least-once/compensate).
  • Receipts: stored results of commands keyed by InvocationId. On replay, receipts short-circuit re-execution.
  • Journal: append-only log (or materialized record) of state + issued commands (+ receipts). Persisted before effects run.
  • Engine: feeds events through the reducer, persists state/commands, executes commands, records receipts, and maps internal state to A2A status.

Replay/fault tolerance

  1. Reducer returns: next state + commands (+ optional message/artifacts/status hints).
  2. Engine persists {state, payload, pending_commands}.
  3. Engine dispatches commands. Each gets InvocationId.
  4. On command completion, store a receipt {InvocationId, result}.
  5. On crash/restart: load state + pending commands + receipts.
    • If a command has no receipt, reissue with same InvocationId.
    • If a receipt exists, feed the result as an event; do not re-run the side effect.
  6. Awaiting human input is just a state; engine exposes A2A input-required while in that state.

A2A compatibility

  • Externally unchanged: one Task per flow, statuses working/input-required/completed/failed/rejected, artifacts/messages/events streamed over the existing bus.
  • Agent card still lists “skills”; each skill is an entrypoint into a flow/state machine.
  • Streaming/resubscribe works by replaying the journal into SSE just like today.

Authoring options

Option 1: No macros (explicit reducer)

use radkit::v2::{FlowSkill, Event, Transition, Cmd, Ctx, AgentResult};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct Cycle { draft: String, critique: Option<Critique>, human_note: Option<String> }

#[derive(Serialize, Deserialize, Clone)]
enum State {
    Start,
    Researching { history: Vec<Cycle> },
    Critiquing { draft: String, history: Vec<Cycle> },
    AwaitHuman { draft: String, history: Vec<Cycle> },
    Done,
    Failed { reason: String },
}

struct ResearchLoop;

#[async_trait::async_trait]
impl FlowSkill for ResearchLoop {
    type State = State;

    async fn on_event(&self, state: State, event: Event, ctx: &Ctx) -> AgentResult<Transition<State>> {
        match (state, event) {
            (State::Start, Event::UserMessage { content }) => {
                Ok(Transition::to(State::Researching { history: vec![] })
                    .cmd(Cmd::llm("research", "Draft answer", content)))
            }
            (State::Researching { mut history }, Event::LlmResult { id, value }) if id == "research" => {
                let draft: String = ctx.decode(value)?;
                history.push(Cycle { draft: draft.clone(), critique: None, human_note: None });
                Ok(Transition::to(State::Critiquing { draft: draft.clone(), history })
                    .cmd(Cmd::llm("critic", "Critique draft", Content::from_text(draft))))
            }
            (State::Critiquing { draft, mut history }, Event::LlmResult { id, value }) if id == "critic" => {
                let verdict: Critique = ctx.decode(value)?;
                history.last_mut().unwrap().critique = Some(verdict.clone());
                match verdict.decision {
                    Decision::Approved => Ok(Transition::complete(State::Done)
                        .message(Content::from_text("Approved"))
                        .artifact(Artifact::from_text("final.txt", &draft))),
                    Decision::NeedsRevision => Ok(Transition::to(State::Researching { history })
                        .cmd(Cmd::llm("research", "Revise given critique", Content::from_text(draft)))),
                    Decision::NeedsHumanInput => Ok(Transition::await_input(
                        "human_note",
                        Content::from_text("Critic needs human input."),
                        State::AwaitHuman { draft, history },
                    )),
                }
            }
            (State::AwaitHuman { draft, mut history }, Event::UserMessage { content }) => {
                let note = content.joined_texts().unwrap_or_default();
                history.last_mut().unwrap().human_note = Some(note.clone());
                let merged = format!("{draft}\n\nHuman note: {note}");
                Ok(Transition::to(State::Researching { history })
                    .cmd(Cmd::llm("research", "Revise with human input", Content::from_text(merged))))
            }
            (state, _) => Ok(Transition::ignore(state)),
        }
    }
}

Option 2: Minimal macro sugar (named steps, no visible state enum)

use radkit::v2::{flow_skill, StepCtx, StepResult, goto, await_input};

#[derive(Serialize, Deserialize, Clone)]
struct Cycle { draft: String, critique: Option<Critique>, human_note: Option<String> }

#[flow_skill(id = "research_loop_v2", name = "Research Loop v2")]
pub struct ResearchLoop;

impl ResearchLoop {
    #[step(start)]
    async fn research(&self, ctx: &StepCtx, msg: Content, history: Vec<Cycle>) -> StepResult {
        let question = msg.joined_texts().unwrap_or_default();
        let history_text = history
            .iter()
            .enumerate()
            .map(|(i, c)| format!("Draft {}: {}", i + 1, c.draft))
            .collect::<Vec<_>>()
            .join("\n");
        let input = Content::from_text(format!(
            "Question:\n{}\n\nPrevious attempts:\n{}",
            question, history_text
        ));
        let draft: String = ctx.llm("research").run(input).await?;
        let mut hist = history;
        hist.push(Cycle { draft: draft.clone(), critique: None, human_note: None });
        goto!("critique", (draft, hist))
    }

    #[step]
    async fn critique(&self, ctx: &StepCtx, payload: (String, Vec<Cycle>)) -> StepResult {
        let (draft, mut history) = payload;
        let verdict: Critique = ctx.llm("critic").run(Content::from_text(&draft)).await?;
        history.last_mut().unwrap().critique = Some(verdict.clone());
        match verdict.decision {
            Decision::Approved => ctx.complete("Approved by critic.", vec![Artifact::from_text("final.txt", &draft)]),
            Decision::NeedsRevision => goto!("research", (Content::from_text(&draft), history)),
            Decision::NeedsHumanInput => await_input!(
                slot = "human_note",
                message = "Critic needs human input.",
                carry = (draft, history)
            ),
        }
    }

    #[step(resume_from = "critique")]
    async fn human_input(&self, payload: (String, Vec<Cycle>), msg: Content) -> StepResult {
        let (draft, mut history) = payload;
        let note = msg.joined_texts().unwrap_or_default();
        history.last_mut().unwrap().human_note = Some(note.clone());
        let merged = format!("{draft}\n\nHuman note: {note}");
        goto!("research", (Content::from_text(&merged), history))
    }
}

Option 3: No macros, step table (plain functions)

use radkit::v2::{Cmd, EffectPolicy, FlowSkill, Step, StepCtx, Transition};
use radkit::agent::Artifact;
use radkit::models::Content;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct Cycle { draft: String, critique: Option<Critique>, human_note: Option<String> }

#[derive(Serialize, Deserialize, Clone)]
struct StateData { history: Vec<Cycle> }

const STEP_RESEARCH: StepId = StepId::new("research");
const STEP_CRITIQUE: StepId = StepId::new("critique");
const STEP_HUMAN_INPUT: StepId = StepId::new("human_input");

struct ResearchLoop;

impl ResearchLoop {
    async fn research(&self, ctx: &StepCtx, state: StateData, msg: Content) -> Transition<StateData> {
        let draft: String = ctx.llm("research").run(msg).await?;
        let mut history = state.history;
        history.push(Cycle { draft: draft.clone(), critique: None, human_note: None });
        Transition::goto(STEP_CRITIQUE, StateData { history })
            .cmd(Cmd::llm("critic", "Critique draft; approve/revise/human", Content::from_text(draft)))
    }

    async fn critique(&self, ctx: &StepCtx, state: StateData) -> Transition<StateData> {
        let mut history = state.history;
        let draft = history.last().unwrap().draft.clone();
        let verdict: Critique = ctx.llm("critic").run(Content::from_text(&draft)).await?;
        history.last_mut().unwrap().critique = Some(verdict.clone());

        match verdict.decision {
            Decision::Approved => Transition::complete(StateData { history })
                .message(Content::from_text("Approved by critic."))
                .artifact(Artifact::from_text("final.txt", &draft)),
            Decision::NeedsRevision => Transition::goto(STEP_RESEARCH, StateData { history })
                .cmd(Cmd::llm("research", "Revise given critique", Content::from_text(draft))),
            Decision::NeedsHumanInput => Transition::await_input(
                "human_note",
                Content::from_text("Critic needs human input before revising."),
                StateData { history },
            ),
        }
    }

    async fn human_input(&self, ctx: &StepCtx, state: StateData, msg: Content) -> Transition<StateData> {
        let mut history = state.history;
        let note = msg.joined_texts().unwrap_or_default();
        history.last_mut().unwrap().human_note = Some(note.clone());
        let merged = format!("{}\n\nHuman note: {note}", history.last().unwrap().draft);
        Transition::goto(STEP_RESEARCH, StateData { history })
            .cmd(Cmd::llm("research", "Revise with human input", Content::from_text(merged)))
    }
}

static STEPS: &[Step<StateData>] = &[
    Step::new(STEP_RESEARCH, ResearchLoop::research).start().takes_user_message(),
    Step::new(STEP_CRITIQUE, ResearchLoop::critique),
    Step::new(STEP_HUMAN_INPUT, ResearchLoop::human_input).resume_from(STEP_CRITIQUE),
];

impl FlowSkill for ResearchLoop {
    type Payload = StateData;
    fn steps(&self) -> &'static [Step<Self::Payload>] { STEPS }
}

Fault tolerance (no macros here):

  • Engine persists {step_name, payload, issued_commands} before executing commands.
  • Commands carry InvocationId; receipts are stored. On replay: pending commands without receipts are reissued with the same ID; commands with receipts are skipped and their results are fed into the reducer.
  • await_input sets a persisted “awaiting user” state; after restart the engine keeps the task input-required until a new user message arrives and routes to the resume_from step.
  • Side-effect policy is explicit on Cmd (e.g., EffectPolicy::AtLeastOnce) to control dedupe/compensation on replay.

How the engine seeds args

  • For the first user message, the engine calls the start step with:
    • The inbound Content from the user (if the step requests it).
    • Default payloads for extra params that implement Default (e.g., history: Vec<Cycle> starts empty).
  • On subsequent events/resumes, the engine reloads the persisted payload/state and replays it into the step; no guessing required.
  • Authoring rules stay simple:
    • Steps can take &StepCtx and any serializable payload parameters; if a parameter doesn’t come from the event (e.g., history), it must be Deserialize and either persisted already or Default-constructible on first call.
    • “Await input” resume steps must declare the payload they expect plus the incoming Content if needed.
    • Commands/results are typed via ctx.decode::<T>(...) using the schema specified in the command.

Engine responsibilities

  • Route events to the right step/reducer (by step name or generated state).
  • Persist state + payload + pending commands before executing commands.
  • Assign InvocationId per command; store receipts. On replay, reissue or short-circuit.
  • Map “await input” state to A2A input-required; otherwise working until complete/fail.
  • Emit artifacts/messages/progress to the existing event bus; SSE resubscribe replays the journal.

Command and receipt shape (illustrative)

enum Cmd {
    Llm {
        id: String,
        model: String,
        prompt: String,
        input: Content,
        expect: SchemaId, // typed decode
        invocation_id: InvocationId,
    },
    Tool {
        id: String,
        name: String,
        args: serde_json::Value,
        effect: EffectPolicy,
        invocation_id: InvocationId,
    },
    AskUser { slot: String, message: Content, schema: SchemaId },
    EmitArtifact(Artifact),
    EmitMessage(Content),
    Sleep(Duration),
}

struct Receipt {
    invocation_id: InvocationId,
    outcome: serde_json::Value, // typed decoded via schema/expect
    status: ReceiptStatus,
}

Running on Lambda / short-lived hosts

  • Put journal/state/receipts in durable storage (e.g., DynamoDB).
  • Drive the engine by events from a queue (SQS) or HTTP trigger; one reducer pass per Lambda invocation.
  • Long LLM/tool calls can be dispatched as commands to worker Lambdas; use InvocationId to dedupe.
  • Await-input states persist; the next invocation that sees a user message resumes from the stored state.

What stays A2A-compatible

  • Same RPCs/routes: message/send, message/stream, tasks/get, agent card.
  • Same statuses: working, input-required, completed, failed, rejected.
  • Same artifacts/messages surface; step timeline is internal unless exposed as optional artifacts/extensions.

Development notes

  • Keep the reducer pure (no side effects inside); all effects go out via Cmd.
  • Prefer typed payloads; avoid serde_json::Value in user code except at boundaries.
  • Use EffectPolicy for side effects (idempotent vs at-least-once) to control replay behavior.
  • Tests: feed synthetic events into the reducer, assert resulting state/commands; fuzz with proptest for edge cases.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions