Skip to content

nassor/cano

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

38 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Cano Logo

Cano: Type-Safe Async Workflow Engine

Crates.io Documentation Website Downloads License CI Rust Version

Orchestrate complex async processes with finite state machines, parallel execution, and built-in scheduling.

Overview

Cano is a high-performance orchestration engine designed for building resilient, self-healing systems in Rust. Unlike simple task queues, Cano uses Finite State Machines (FSM) to define strict, type-safe transitions between processing steps.

It excels at managing complex lifecycles where state transitions matter:

  • Data Pipelines: ETL jobs with parallel processing (Split/Join) and aggregation.
  • AI Agents: Multi-step inference chains with shared context and memory.
  • Background Systems: Scheduled maintenance, periodic reporting, and distributed cron jobs.

The engine is built on three core concepts: Tasks/Nodes for logic, Workflows for state transitions, and Schedulers for timing.

Features

  • Type-Safe State Machines: Enum-driven transitions with compile-time guarantees.
  • Flexible Processing Units: Choose between simple Tasks or structured Nodes (Prep/Exec/Post lifecycle).
  • Parallel Execution (Split/Join): Run tasks concurrently and join results with strategies like All, Any, Quorum, or PartialResults.
  • Robust Retry Logic: Configurable strategies including exponential backoff with jitter.
  • Built-in Scheduling: Cron-based, interval, and manual triggers for background jobs.
  • Concurrent Workflows: Run multiple instances of the same workflow with quota management.
  • Observability: Integrated tracing support for deep insights into workflow execution.
  • Performance-Focused: Minimizes heap allocations by leveraging stack-based objects wherever possible, giving you control over where allocations occur.

Simple Example: Parallel Processing

Here is a real-world example showing how to split execution into parallel tasks and join them back together.

graph TD
    Start([Start]) --> Split{Split}
    Split -->|Source 1| T1[FetchSourceTask 1]
    Split -->|Source 2| T2[FetchSourceTask 2]
    Split -->|Source 3| T3[FetchSourceTask 3]
    T1 --> Join{Join All}
    T2 --> Join
    T3 --> Join
    Join --> Aggregate[Aggregate]
    Aggregate --> Complete([Complete])
Loading
use cano::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum FlowState {
    Start,
    FetchData,
    Aggregate,
    Complete,
}

// A task that simulates fetching data from a source
#[derive(Clone)]
struct FetchSourceTask {
    source_id: u32,
}

#[async_trait::async_trait]
impl Task<FlowState> for FetchSourceTask {
    async fn run(&self, store: &MemoryStore) -> Result<FlowState, CanoError> {
        // Simulate async work
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        // Store result
        let key = format!("source_{}", self.source_id);
        store.put(&key, format!("data_from_{}", self.source_id))?;
        
        Ok(FlowState::Aggregate)
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut workflow = Workflow::new(FlowState::Start);
    let store = MemoryStore::new();

    // 1. Define parallel tasks
    let sources = vec![
        FetchSourceTask { source_id: 1 },
        FetchSourceTask { source_id: 2 },
        FetchSourceTask { source_id: 3 },
    ];

    // 2. Configure join strategy
    // Wait for ALL tasks to complete successfully before moving to Aggregate
    let join_config = JoinConfig::new(
        JoinStrategy::All,
        FlowState::Aggregate
    ).with_timeout(Duration::from_secs(5));

    // 3. Build Workflow
    workflow
        // Start -> Split into parallel tasks
        .register_split(
            FlowState::Start,
            sources,
            join_config
        )
        // Aggregate -> Complete (using a closure task for simplicity)
        .register(FlowState::Aggregate, |store: &MemoryStore| async move {
            println!("All sources fetched! Aggregating...");
            Ok(FlowState::Complete)
        })
        .add_exit_state(FlowState::Complete);

    // 4. Run
    let result = workflow.orchestrate(&store).await?;
    println!("Workflow finished: {:?}", result);
    
    Ok(())
}

Documentation

For complete documentation, examples, and guides, please visit our website:

👉 https://nassor.github.io/cano/

You can also find:

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

Type-Safe Async Workflow Engine

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages