Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
/target
*.vscode
/chain/assets
/types/pkg
/types/pkg

targets
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ resolver = "2"
[workspace.dependencies]
alto-client = { version = "0.0.6", path = "client" }
alto-types = { version = "0.0.6", path = "types" }
commonware-broadcast = { version = "0.0.43" }
commonware-consensus = { version = "0.0.43" }
commonware-cryptography = { version = "0.0.43" }
commonware-deployer = { version = "0.0.43" }
Expand Down
1 change: 1 addition & 0 deletions chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ documentation = "https://docs.rs/alto-chain"
[dependencies]
alto-types = { workspace = true }
alto-client = { workspace = true }
commonware-broadcast = { workspace = true }
commonware-consensus = { workspace = true }
commonware-cryptography = { workspace = true }
commonware-deployer = { workspace = true }
Expand Down
59 changes: 59 additions & 0 deletions chain/src/actors/mempool/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use super::{ ingress::{Mailbox, Message}, mempool};
use commonware_broadcast::Broadcaster;
use commonware_cryptography::Digest;
use commonware_utils::Array;
use futures::{
channel::mpsc,
StreamExt,
};
use tracing::{error, warn, debug};


pub struct Actor<D: Digest, P: Array> {
mailbox: mpsc::Receiver<Message<D, P>>,
}

impl<D: Digest, P: Array> Actor<D, P> {
pub fn new() -> (Self, Mailbox<D, P>) {
let (sender, receiver) = mpsc::channel(1024);
(Actor { mailbox: receiver }, Mailbox::new(sender))
}

pub async fn run(mut self,
mut engine: impl Broadcaster<Digest = D>,
mut mempool: mempool::Mailbox<D>
) {
// it passes msgs in the mailbox of the actor to the engine mailbox
while let Some(msg) = self.mailbox.next().await {
match msg {
Message::Broadcast(payload) => {
debug!("broadcasting batch {}", payload);
let receiver = engine.broadcast(payload).await;
let result = receiver.await;
match result {
Ok(true) => {}
Ok(false) => {
error!("broadcast returned false")
}
Err(_) => {
error!("broadcast dropped")
}
}
}
Message::Verify(_context, _payload, sender) => {
debug!(digest=?_payload, "incoming verification request");
let Some(_) = mempool.get_batch(_payload).await else {
warn!(?_payload, "batch not exists");
let _ = sender.send(false);
continue;
};
debug!("issue verfication to batch={}", _payload);
let result = sender.send(true);
if result.is_err() {
error!("verify dropped");
}
}
}
}
}
}
62 changes: 62 additions & 0 deletions chain/src/actors/mempool/archive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use bytes::Bytes;
use commonware_runtime::{Blob, Metrics, Storage};
use commonware_storage::archive::{self, Archive, Identifier, Translator};
use commonware_utils::Array;
use futures::lock::Mutex;
use std::sync::Arc;

/// Archive wrapper that handles all locking.
#[derive(Clone)]
pub struct Wrapped<T, K, B, R>
where
T: Translator,
K: Array,
B: Blob,
R: Storage<B> + Metrics,
{
inner: Arc<Mutex<Archive<T, K, B, R>>>,
}

impl<T, K, B, R> Wrapped<T, K, B, R>
where
T: Translator,
K: Array,
B: Blob,
R: Storage<B> + Metrics,
{
/// Creates a new `Wrapped` from an existing `Archive`.
pub fn new(archive: Archive<T, K, B, R>) -> Self {
Self {
inner: Arc::new(Mutex::new(archive)),
}
}

/// Retrieves a value from the archive by identifier.
pub async fn get(
&self,
identifier: Identifier<'_, K>,
) -> Result<Option<Bytes>, archive::Error> {
let archive = self.inner.lock().await;
archive.get(identifier).await
}

/// Inserts a value into the archive with the given index and key.
pub async fn put(&self, index: u64, key: K, data: Bytes) -> Result<(), archive::Error> {
let mut archive = self.inner.lock().await;
archive.put(index, key, data).await?;
Ok(())
}

/// Prunes entries from the archive up to the specified minimum index.
pub async fn prune(&self, min_index: u64) -> Result<(), archive::Error> {
let mut archive = self.inner.lock().await;
archive.prune(min_index).await?;
Ok(())
}

/// Retrieves the next gap in the archive.
pub async fn next_gap(&self, start: u64) -> (Option<u64>, Option<u64>) {
let archive = self.inner.lock().await;
archive.next_gap(start)
}
}
79 changes: 79 additions & 0 deletions chain/src/actors/mempool/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use commonware_broadcast::{linked::Prover, Collector as Z, Proof, };
use commonware_cryptography::{bls12381::primitives::group, Digest, Scheme};
use futures::{
channel::mpsc,
SinkExt, StreamExt,
};
use tracing::error;

use super::mempool;

enum Message<C: Scheme, D: Digest> {
Acknowledged(Proof, D),
_Phantom(C::PublicKey),
}

pub struct Collector<C: Scheme, D: Digest> {
mailbox: mpsc::Receiver<Message<C, D>>,

// Application namespace
namespace: Vec<u8>,

// Public key of the group
public: group::Public,
}

impl<C: Scheme, D: Digest> Collector<C, D> {
pub fn new(namespace: &[u8], public: group::Public) -> (Self, Mailbox<C, D>) {
let (sender, receiver) = mpsc::channel(1024);
(
Collector {
mailbox: receiver,
namespace: namespace.to_vec(),
public,
},
Mailbox { sender },
)
}

pub async fn run(mut self, mut mempool: mempool::Mailbox<D>) {
while let Some(msg) = self.mailbox.next().await {
match msg {
Message::Acknowledged(proof, payload) => {
// Check proof.
// The prover checks the validity of the threshold signature when deserializing
let prover = Prover::<C, D>::new(&self.namespace, self.public);
let _ = match prover.deserialize_threshold(proof) {
Some((context, _payload, _epoch, _threshold)) => context,
None => {
error!("invalid proof");
continue;
}
};

// Acknowledge batch in mempool, mark the batch as ready for pickup
let acknowledge = mempool.acknowledge_batch(payload).await;
if !acknowledge {
error!("unable to acknowledge batch {}", payload)
}
}
_ => unreachable!()
}
}
}
}

#[derive(Clone)]
pub struct Mailbox<C: Scheme, D: Digest> {
sender: mpsc::Sender<Message<C, D>>,
}

impl<C: Scheme, D: Digest> Z for Mailbox<C, D> {
type Digest = D;
async fn acknowledged(&mut self, proof: Proof, payload: Self::Digest) {
self.sender
.send(Message::Acknowledged(proof, payload))
.await
.expect("Failed to send acknowledged");
}
}
93 changes: 93 additions & 0 deletions chain/src/actors/mempool/coordinator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use commonware_broadcast::{linked::Epoch, Coordinator as S, ThresholdCoordinator as T};
use commonware_cryptography::bls12381::primitives::{
group::{Public, Share},
poly::Poly,
};
use commonware_resolver::p2p;
use commonware_utils::Array;
use std::collections::HashMap;

// TODO: The implementation is copied from commonware-broadcast::linked::mocks, should be updated to track the
/// Implementation of `commonware-consensus::Coordinator`.
#[derive(Clone)]
pub struct Coordinator<P: Array> {
view: u64,
identity: Poly<Public>,
signers: Vec<P>,
signers_map: HashMap<P, u32>,
share: Share,
}

impl<P: Array> Coordinator<P> {
pub fn new(identity: Poly<Public>, mut signers: Vec<P>, share: Share) -> Self {
// Setup signers
signers.sort();
let mut signers_map = HashMap::new();
for (index, validator) in signers.iter().enumerate() {
signers_map.insert(validator.clone(), index as u32);
}

// Return coordinator
Self {
view: 0,
identity,
signers,
signers_map,
share,
}
}

pub fn set_view(&mut self, view: u64) {
self.view = view;
}
}

impl<P: Array> S for Coordinator<P> {
type Index = Epoch;
type PublicKey = P;

fn index(&self) -> Self::Index {
self.view
}

fn signers(&self, _: Self::Index) -> Option<&Vec<Self::PublicKey>> {
Some(&self.signers)
}

fn is_signer(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option<u32> {
self.signers_map.get(candidate).cloned()
}

fn sequencers(&self, _: Self::Index) -> Option<&Vec<Self::PublicKey>> {
Some(&self.signers)
}

fn is_sequencer(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option<u32> {
self.signers_map.get(candidate).cloned()
}
}

impl<P: Array> T for Coordinator<P> {
type Identity = Poly<Public>;
type Share = Share;

fn identity(&self, _: Self::Index) -> Option<&Self::Identity> {
Some(&self.identity)
}

fn share(&self, _: Self::Index) -> Option<&Self::Share> {
Some(&self.share)
}
}

impl <P: Array> p2p::Coordinator for Coordinator<P> {
type PublicKey = P;

fn peers(&self) -> &Vec<Self::PublicKey> {
&self.signers
}

fn peer_set_id(&self) -> u64 {
0
}
}
Loading