diff --git a/.gitignore b/.gitignore index 18dfef8f..1bfca715 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ /target *.vscode /chain/assets -/types/pkg \ No newline at end of file +/types/pkg + +targets \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index af3e7950..28d57788 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,7 @@ dependencies = [ "axum", "bytes", "clap", + "commonware-broadcast", "commonware-consensus", "commonware-cryptography", "commonware-deployer", @@ -805,6 +806,28 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "commonware-broadcast" +version = "0.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2845a125a1996d82604970a3d72c4fd68e314432ff33023dafbde3a0d788ca2" +dependencies = [ + "bytes", + "commonware-cryptography", + "commonware-macros", + "commonware-p2p", + "commonware-runtime", + "commonware-storage", + "commonware-utils", + "futures", + "prometheus-client", + "prost", + "prost-build", + "rand", + "thiserror 2.0.12", + "tracing", +] + [[package]] name = "commonware-consensus" version = "0.0.43" diff --git a/Cargo.toml b/Cargo.toml index a257cb96..334ede8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ resolver = "2" [workspace.dependencies] alto-client = { version = "0.0.6", path = "client" } alto-types = { version = "0.0.6", path = "types" } +commonware-broadcast = { version = "0.0.43" } commonware-consensus = { version = "0.0.43" } commonware-cryptography = { version = "0.0.43" } commonware-deployer = { version = "0.0.43" } diff --git a/chain/Cargo.toml b/chain/Cargo.toml index 20823756..127d3fde 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://docs.rs/alto-chain" [dependencies] alto-types = { workspace = true } alto-client = { workspace = true } +commonware-broadcast = { workspace = true } commonware-consensus = { workspace = true } commonware-cryptography = { workspace = true } commonware-deployer = { workspace = true } diff --git a/chain/src/actors/mempool/actor.rs b/chain/src/actors/mempool/actor.rs new file mode 100644 index 00000000..ae47cd76 --- /dev/null +++ b/chain/src/actors/mempool/actor.rs @@ -0,0 +1,59 @@ +use super::{ ingress::{Mailbox, Message}, mempool}; +use commonware_broadcast::Broadcaster; +use commonware_cryptography::Digest; +use commonware_utils::Array; +use futures::{ + channel::mpsc, + StreamExt, +}; +use tracing::{error, warn, debug}; + + +pub struct Actor { + mailbox: mpsc::Receiver>, +} + +impl Actor { + pub fn new() -> (Self, Mailbox) { + let (sender, receiver) = mpsc::channel(1024); + (Actor { mailbox: receiver }, Mailbox::new(sender)) + } + + pub async fn run(mut self, + mut engine: impl Broadcaster, + mut mempool: mempool::Mailbox + ) { + // it passes msgs in the mailbox of the actor to the engine mailbox + while let Some(msg) = self.mailbox.next().await { + match msg { + Message::Broadcast(payload) => { + debug!("broadcasting batch {}", payload); + let receiver = engine.broadcast(payload).await; + let result = receiver.await; + match result { + Ok(true) => {} + Ok(false) => { + error!("broadcast returned false") + } + Err(_) => { + error!("broadcast dropped") + } + } + } + Message::Verify(_context, _payload, sender) => { + debug!(digest=?_payload, "incoming verification request"); + let Some(_) = mempool.get_batch(_payload).await else { + warn!(?_payload, "batch not exists"); + let _ = sender.send(false); + continue; + }; + debug!("issue verfication to batch={}", _payload); + let result = sender.send(true); + if result.is_err() { + error!("verify dropped"); + } + } + } + } + } +} diff --git a/chain/src/actors/mempool/archive.rs b/chain/src/actors/mempool/archive.rs new file mode 100644 index 00000000..9898cc46 --- /dev/null +++ b/chain/src/actors/mempool/archive.rs @@ -0,0 +1,62 @@ +use bytes::Bytes; +use commonware_runtime::{Blob, Metrics, Storage}; +use commonware_storage::archive::{self, Archive, Identifier, Translator}; +use commonware_utils::Array; +use futures::lock::Mutex; +use std::sync::Arc; + +/// Archive wrapper that handles all locking. +#[derive(Clone)] +pub struct Wrapped +where + T: Translator, + K: Array, + B: Blob, + R: Storage + Metrics, +{ + inner: Arc>>, +} + +impl Wrapped +where + T: Translator, + K: Array, + B: Blob, + R: Storage + Metrics, +{ + /// Creates a new `Wrapped` from an existing `Archive`. + pub fn new(archive: Archive) -> Self { + Self { + inner: Arc::new(Mutex::new(archive)), + } + } + + /// Retrieves a value from the archive by identifier. + pub async fn get( + &self, + identifier: Identifier<'_, K>, + ) -> Result, archive::Error> { + let archive = self.inner.lock().await; + archive.get(identifier).await + } + + /// Inserts a value into the archive with the given index and key. + pub async fn put(&self, index: u64, key: K, data: Bytes) -> Result<(), archive::Error> { + let mut archive = self.inner.lock().await; + archive.put(index, key, data).await?; + Ok(()) + } + + /// Prunes entries from the archive up to the specified minimum index. + pub async fn prune(&self, min_index: u64) -> Result<(), archive::Error> { + let mut archive = self.inner.lock().await; + archive.prune(min_index).await?; + Ok(()) + } + + /// Retrieves the next gap in the archive. + pub async fn next_gap(&self, start: u64) -> (Option, Option) { + let archive = self.inner.lock().await; + archive.next_gap(start) + } +} diff --git a/chain/src/actors/mempool/collector.rs b/chain/src/actors/mempool/collector.rs new file mode 100644 index 00000000..9b746887 --- /dev/null +++ b/chain/src/actors/mempool/collector.rs @@ -0,0 +1,79 @@ +use commonware_broadcast::{linked::Prover, Collector as Z, Proof, }; +use commonware_cryptography::{bls12381::primitives::group, Digest, Scheme}; +use futures::{ + channel::mpsc, + SinkExt, StreamExt, +}; +use tracing::error; + +use super::mempool; + +enum Message { + Acknowledged(Proof, D), + _Phantom(C::PublicKey), +} + +pub struct Collector { + mailbox: mpsc::Receiver>, + + // Application namespace + namespace: Vec, + + // Public key of the group + public: group::Public, +} + +impl Collector { + pub fn new(namespace: &[u8], public: group::Public) -> (Self, Mailbox) { + let (sender, receiver) = mpsc::channel(1024); + ( + Collector { + mailbox: receiver, + namespace: namespace.to_vec(), + public, + }, + Mailbox { sender }, + ) + } + + pub async fn run(mut self, mut mempool: mempool::Mailbox) { + while let Some(msg) = self.mailbox.next().await { + match msg { + Message::Acknowledged(proof, payload) => { + // Check proof. + // The prover checks the validity of the threshold signature when deserializing + let prover = Prover::::new(&self.namespace, self.public); + let _ = match prover.deserialize_threshold(proof) { + Some((context, _payload, _epoch, _threshold)) => context, + None => { + error!("invalid proof"); + continue; + } + }; + + // Acknowledge batch in mempool, mark the batch as ready for pickup + let acknowledge = mempool.acknowledge_batch(payload).await; + if !acknowledge { + error!("unable to acknowledge batch {}", payload) + } + } + _ => unreachable!() + } + } + } +} + +#[derive(Clone)] +pub struct Mailbox { + sender: mpsc::Sender>, +} + +impl Z for Mailbox { + type Digest = D; + async fn acknowledged(&mut self, proof: Proof, payload: Self::Digest) { + self.sender + .send(Message::Acknowledged(proof, payload)) + .await + .expect("Failed to send acknowledged"); + } +} \ No newline at end of file diff --git a/chain/src/actors/mempool/coordinator.rs b/chain/src/actors/mempool/coordinator.rs new file mode 100644 index 00000000..da796a12 --- /dev/null +++ b/chain/src/actors/mempool/coordinator.rs @@ -0,0 +1,93 @@ +use commonware_broadcast::{linked::Epoch, Coordinator as S, ThresholdCoordinator as T}; +use commonware_cryptography::bls12381::primitives::{ + group::{Public, Share}, + poly::Poly, +}; +use commonware_resolver::p2p; +use commonware_utils::Array; +use std::collections::HashMap; + +// TODO: The implementation is copied from commonware-broadcast::linked::mocks, should be updated to track the +/// Implementation of `commonware-consensus::Coordinator`. +#[derive(Clone)] +pub struct Coordinator { + view: u64, + identity: Poly, + signers: Vec

, + signers_map: HashMap, + share: Share, +} + +impl Coordinator

{ + pub fn new(identity: Poly, mut signers: Vec

, share: Share) -> Self { + // Setup signers + signers.sort(); + let mut signers_map = HashMap::new(); + for (index, validator) in signers.iter().enumerate() { + signers_map.insert(validator.clone(), index as u32); + } + + // Return coordinator + Self { + view: 0, + identity, + signers, + signers_map, + share, + } + } + + pub fn set_view(&mut self, view: u64) { + self.view = view; + } +} + +impl S for Coordinator

{ + type Index = Epoch; + type PublicKey = P; + + fn index(&self) -> Self::Index { + self.view + } + + fn signers(&self, _: Self::Index) -> Option<&Vec> { + Some(&self.signers) + } + + fn is_signer(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option { + self.signers_map.get(candidate).cloned() + } + + fn sequencers(&self, _: Self::Index) -> Option<&Vec> { + Some(&self.signers) + } + + fn is_sequencer(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option { + self.signers_map.get(candidate).cloned() + } +} + +impl T for Coordinator

{ + type Identity = Poly; + type Share = Share; + + fn identity(&self, _: Self::Index) -> Option<&Self::Identity> { + Some(&self.identity) + } + + fn share(&self, _: Self::Index) -> Option<&Self::Share> { + Some(&self.share) + } +} + +impl p2p::Coordinator for Coordinator

{ + type PublicKey = P; + + fn peers(&self) -> &Vec { + &self.signers + } + + fn peer_set_id(&self) -> u64 { + 0 + } +} \ No newline at end of file diff --git a/chain/src/actors/mempool/handler.rs b/chain/src/actors/mempool/handler.rs new file mode 100644 index 00000000..e0f014e2 --- /dev/null +++ b/chain/src/actors/mempool/handler.rs @@ -0,0 +1,68 @@ +use super::key::MultiIndex; +use bytes::Bytes; +use commonware_resolver::{p2p::Producer, Consumer}; +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, +}; +use tracing::warn; + +pub enum Message { + Deliver { + key: MultiIndex, + value: Bytes, + response: oneshot::Sender, + }, + Produce { + key: MultiIndex, + response: oneshot::Sender, + }, +} + +/// Mailbox for resolver +#[derive(Clone)] +pub struct Handler { + sender: mpsc::Sender, +} + +impl Handler { + pub(super) fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } +} + +impl Consumer for Handler { + type Key = MultiIndex; + type Value = Bytes; + type Failure = (); + + async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::Deliver { + key, + value, + response, + }) + .await + .expect("Failed to send deliver"); + receiver.await.expect("Failed to receive deliver") + } + + async fn failed(&mut self, key: Self::Key, failture: Self::Failure) { + warn!(?key, ?failture, "failed at consumer"); + } +} + +impl Producer for Handler { + type Key = MultiIndex; + + async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::Produce { key, response }) + .await + .expect("Failed to send produce"); + receiver + } +} diff --git a/chain/src/actors/mempool/ingress.rs b/chain/src/actors/mempool/ingress.rs new file mode 100644 index 00000000..e8dd1def --- /dev/null +++ b/chain/src/actors/mempool/ingress.rs @@ -0,0 +1,49 @@ +use commonware_utils::Array; +use commonware_cryptography::Digest; +use commonware_broadcast::{linked::Context, Application as A}; +use futures::{ channel::{mpsc, oneshot}, SinkExt}; + +pub struct Payload { + #[allow(dead_code)] + data: Vec +} + +pub enum Message { + Broadcast(D), + Verify(Context

, D, oneshot::Sender), +} + +#[derive(Clone)] +pub struct Mailbox { + sender: mpsc::Sender>, +} + +impl Mailbox { + pub(super) fn new(sender: mpsc::Sender>) -> Self { + Self { + sender + } + } + + pub async fn broadcast(&mut self, payload: D) { + let _ = self.sender.send(Message::Broadcast(payload)).await; + } +} + +impl A for Mailbox { + type Context = Context

; + type Digest = D; + + async fn verify( + &mut self, + context: Self::Context, + payload: Self::Digest, + ) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + let _ = self + .sender + .send(Message::Verify(context, payload, sender)) + .await; + receiver + } +} \ No newline at end of file diff --git a/chain/src/actors/mempool/key.rs b/chain/src/actors/mempool/key.rs new file mode 100644 index 00000000..d6ed6b98 --- /dev/null +++ b/chain/src/actors/mempool/key.rs @@ -0,0 +1,132 @@ +use commonware_cryptography::sha256::Digest; +use commonware_utils::{Array, SizedSerialize}; +use std::{ + cmp::{Ord, PartialOrd}, + fmt::{Debug, Display}, + hash::Hash, + ops::Deref, +}; +use thiserror::Error; + +const SERIALIZED_LEN: usize = 1 + Digest::SERIALIZED_LEN; + +#[derive(Error, Debug, PartialEq)] +pub enum Error { + #[error("invalid length")] + InvalidLength, +} + +pub enum Value { + Digest(Digest), +} + +#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[repr(transparent)] +pub struct MultiIndex([u8; SERIALIZED_LEN]); + +impl MultiIndex { + pub fn new(value: Value) -> Self { + let mut bytes = [0; SERIALIZED_LEN]; + match value { + Value::Digest(digest) => { + bytes[0] = 0; + bytes[1..].copy_from_slice(&digest); + } + } + Self(bytes) + } + + pub fn to_value(&self) -> Value { + match self.0[0] { + 0 => { + let bytes: [u8; Digest::SERIALIZED_LEN] = self.0[1..].try_into().unwrap(); + let digest = Digest::from(bytes); + Value::Digest(digest) + } + _ => unreachable!(), + } + } +} + +impl Array for MultiIndex { + type Error = Error; +} + +impl SizedSerialize for MultiIndex { + const SERIALIZED_LEN: usize = SERIALIZED_LEN; +} + +impl From<[u8; MultiIndex::SERIALIZED_LEN]> for MultiIndex { + fn from(value: [u8; MultiIndex::SERIALIZED_LEN]) -> Self { + Self(value) + } +} + +impl TryFrom<&[u8]> for MultiIndex { + type Error = Error; + + fn try_from(value: &[u8]) -> Result { + if value.len() != MultiIndex::SERIALIZED_LEN { + return Err(Error::InvalidLength); + } + let array: [u8; MultiIndex::SERIALIZED_LEN] = + value.try_into().map_err(|_| Error::InvalidLength)?; + Ok(Self(array)) + } +} + +impl TryFrom<&Vec> for MultiIndex { + type Error = Error; + + fn try_from(value: &Vec) -> Result { + Self::try_from(value.as_slice()) + } +} + +impl TryFrom> for MultiIndex { + type Error = Error; + + fn try_from(value: Vec) -> Result { + if value.len() != MultiIndex::SERIALIZED_LEN { + return Err(Error::InvalidLength); + } + + // If the length is correct, we can safely convert the vector into a boxed slice without any + // copies. + let boxed_slice = value.into_boxed_slice(); + let boxed_array: Box<[u8; MultiIndex::SERIALIZED_LEN]> = + boxed_slice.try_into().map_err(|_| Error::InvalidLength)?; + Ok(Self(*boxed_array)) + } +} + +impl AsRef<[u8]> for MultiIndex { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl Deref for MultiIndex { + type Target = [u8]; + fn deref(&self) -> &[u8] { + &self.0 + } +} + +impl Debug for MultiIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0[0] { + 0 => { + let bytes: [u8; Digest::SERIALIZED_LEN] = self.0[1..].try_into().unwrap(); + write!(f, "digest({})", Digest::from(bytes)) + } + _ => unreachable!(), + } + } +} + +impl Display for MultiIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self, f) + } +} diff --git a/chain/src/actors/mempool/mempool.rs b/chain/src/actors/mempool/mempool.rs new file mode 100644 index 00000000..a743977e --- /dev/null +++ b/chain/src/actors/mempool/mempool.rs @@ -0,0 +1,645 @@ +use std::{collections::{HashMap}, time::{Duration, SystemTime}}; + +use bytes::{BufMut, Bytes}; +use commonware_cryptography::{ed25519::PublicKey, sha256, Digest, Hasher, Sha256}; +use commonware_p2p::{utils::requester, Receiver, Recipients, Sender}; +use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage}; +use commonware_resolver::{p2p, Resolver}; +use commonware_storage::{ + archive::{self, translator::TwoCap, Archive, Identifier}, + journal::{self, variable::Journal}, +}; +use commonware_utils::SystemTimeExt; +use futures::{channel::{mpsc, oneshot}, SinkExt, StreamExt}; +use commonware_macros::select; +use governor::Quota; +use rand::Rng; +use tracing::{debug, warn, info}; +use governor::clock::Clock as GClock; +use super::{handler::{Handler, self}, key::{self, MultiIndex, Value}, ingress, coordinator::Coordinator, archive::Wrapped}; +use crate::maybe_delay_between; + +#[derive(Clone, Debug)] +pub struct Batch { + pub timestamp: SystemTime, + pub txs: Vec>, + pub digest: D, +} + +impl Batch + where Sha256: Hasher +{ + fn compute_digest(txs: &Vec>) -> D { + let mut hasher = Sha256::new(); + + for tx in txs.into_iter() { + hasher.update(tx.raw.as_ref()); + } + + hasher.finalize() + } + + pub fn new(txs: Vec>, timestamp: SystemTime) -> Self { + let digest = Self::compute_digest(&txs); + + Self { + txs, + digest, + timestamp + } + } + + pub fn serialize(&self) -> Vec { + let mut bytes = Vec::new(); + bytes.put_u64(self.timestamp.epoch_millis()); + bytes.put_u64(self.txs.len() as u64); + for tx in self.txs.iter() { + bytes.put_u64(tx.size()); + bytes.extend_from_slice(&tx.raw); + } + bytes + } + + pub fn deserialize(mut bytes: &[u8]) -> Option { + use bytes::Buf; + // We expect at least 18 bytes for the header + if bytes.remaining() < 18 { + return None; + } + let timestamp = bytes.get_u64(); + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp); + + let tx_count = bytes.get_u64(); + let mut txs = Vec::with_capacity(tx_count as usize); + for _ in 0..tx_count { + // For each transaction, first read the size (u64). + if bytes.remaining() < 8 { + return None; + } + let tx_size = bytes.get_u64() as usize; + // Ensure there are enough bytes left. + if bytes.remaining() < tx_size { + return None; + } + // Extract tx_size bytes. + let tx_bytes = bytes.copy_to_bytes(tx_size); + txs.push(RawTransaction::new(tx_bytes)); + } + // Compute the digest from the transactions. + let digest = Self::compute_digest(&txs); + // Since serialize did not include accepted and timestamp, we set accepted to false + // and set timestamp to the current time. + Some(Self { + timestamp, + txs, + digest, + }) + } + + pub fn contain_tx(&self, digest: &D) -> bool { + self.txs.iter().any(|tx| &tx.digest == digest) + } + + pub fn tx(&self, digest: &D) -> Option> { + self.txs.iter().find(|tx| &tx.digest == digest).cloned() + } +} + +#[derive(Clone, Debug)] +pub struct RawTransaction { + pub raw: Bytes, + + pub digest: D +} + +impl RawTransaction + where Sha256: Hasher +{ + fn compute_digest(raw: &Bytes) -> D { + let mut hasher = Sha256::new(); + hasher.update(&raw); + hasher.finalize() + } + + pub fn new(raw: Bytes) -> Self { + let digest = Self::compute_digest(&raw); + Self { + raw, + digest + } + } + + pub fn validate(&self) -> bool { + // TODO: implement validate here + true + } + + pub fn size(&self) -> u64 { + self.raw.len() as u64 + } +} + +pub enum Message { + // mark batch as accepted by the netowrk through the broadcast protocol + BatchAcknowledged { + digest: D, + response: oneshot::Sender + }, + // from rpc or websocket + SubmitTx { + payload: RawTransaction, + response: oneshot::Sender + }, + BatchConsumed { + digests: Vec, + block_number: u64, + response: oneshot::Sender, + }, + // proposer consume batches to produce a block + ConsumeBatches { + response: oneshot::Sender>> + }, + GetTx { + digest: D, + response: oneshot::Sender>> + }, + GetBatch { + digest: D, + response: oneshot::Sender>> + }, + GetBatchContainTx { + digest: D, + response: oneshot::Sender>> + } +} + +#[derive(Clone)] +pub struct Mailbox { + sender: mpsc::Sender> +} + +impl Mailbox { + pub fn new(sender: mpsc::Sender>) -> Self { + Self { + sender + } + } + + pub async fn acknowledge_batch(&mut self, digest: D) -> bool { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::BatchAcknowledged { digest, response}) + .await + .expect("failed to acknowledge batch"); + + receiver.await.expect("failed to receive batch acknowledge") + } + + pub async fn issue_tx(&mut self, tx: RawTransaction) -> bool { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::SubmitTx { payload: tx, response }) + .await + .expect("failed to issue tx"); + + receiver.await.expect("failed to receive tx issue status") + } + + pub async fn consume_batches(&mut self) -> Vec> { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::ConsumeBatches { response }) + .await + .expect("failed to consume batches"); + + receiver.await.expect("failed to receive batches") + } + + pub async fn consumed_batches(&mut self, digests: Vec, block_number: u64) -> bool { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::BatchConsumed { digests, block_number, response }) + .await + .expect("failed to mark batches as consumed"); + + receiver.await.expect("failed to mark batches as consumed") + } + + pub async fn get_tx(&mut self, digest: D) -> Option> { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::GetTx { digest, response }) + .await + .expect("failed to get tx"); + + receiver.await.expect("failed to receive tx") + } + + pub async fn get_batch(&mut self, digest: D) -> Option> { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::GetBatch { digest, response }) + .await + .expect("failed to get batch"); + + receiver.await.expect("failed to receive batch") + } + + pub async fn get_batch_contain_tx(&mut self, digest: D) -> Option> { + let (response, receiver) = oneshot::channel(); + self.sender + .send(Message::GetBatchContainTx { digest, response }) + .await + .expect("failed to get batch"); + + receiver.await.expect("failed to receive batch") + } +} + +pub struct Config { + pub batch_propose_interval: Duration, + pub batch_size_limit: u64, + pub backfill_quota: Quota, + pub mailbox_size: usize, + pub public_key: PublicKey, + pub partition_prefix: String, + pub block_height: u64, +} + +pub struct Mempool< + B: Blob, + R: Rng + Clock + GClock + Spawner + Metrics + Storage, + D: Digest + Into + From +> { + context: R, + + public_key: PublicKey, + + + batches: HashMap>, + + acknowledged: Vec, + + //TODO: replace the following two + accepted: Archive, + consumed: Archive, + + txs: Vec>, + + mailbox: mpsc::Receiver>, + mailbox_size: usize, + + block_height_seen: u64, + + batch_propose_interval: Duration, + batch_size_limit: u64, + backfill_quota: Quota, +} + +impl< + B: Blob, + R: Rng + Clock + GClock + Spawner + Metrics + Storage, + D: Digest + Into + From +> Mempool + where + Sha256: Hasher, +{ + pub async fn init(context: R, cfg: Config) -> (Self, Mailbox) { + let accepted_journal = Journal::init( + context.with_label("accepted_journal"), + journal::variable::Config { + partition: format!("{}-acceptances", cfg.partition_prefix), + }) + .await + .expect("Failed to initialize accepted journal"); + let accepted_archive = Archive::init( + context.with_label("accepted_archive"), + accepted_journal, + archive::Config { + translator: TwoCap, + section_mask: 0xffff_ffff_fff0_0000u64, + pending_writes: 0, + replay_concurrency: 4, + compression: Some(3), + }) + .await + .expect("Failed to initialize accepted archive"); + + let consumed_journal = Journal::init( + context.with_label("consumed_journal"), + journal::variable::Config { + partition: format!("{}-consumptions", cfg.partition_prefix), + }) + .await + .expect("Failed to initialize consumed journal"); + let consumed_archive = Archive::init( + context.with_label("consumed_archive"), + consumed_journal, + archive::Config { + translator: TwoCap, + section_mask: 0xffff_ffff_fff0_0000u64, + pending_writes: 0, + replay_concurrency: 4, + compression: Some(3), + }) + .await + .expect("Failed to initialize consumed archive"); + + let (sender, receiver) = mpsc::channel(1024); + (Self { + context, + public_key: cfg.public_key, + batches: HashMap::new(), + acknowledged: Vec::new(), + accepted: accepted_archive, + consumed: consumed_archive, + + txs: vec![], + + block_height_seen: cfg.block_height, + + mailbox: receiver, + mailbox_size: cfg.mailbox_size, + + batch_propose_interval: cfg.batch_propose_interval, + batch_size_limit: cfg.batch_size_limit, + backfill_quota: cfg.backfill_quota, + }, Mailbox::new(sender)) + } + + pub fn start( + mut self, + batch_network: ( + impl Sender, + impl Receiver, + ), + backfill_network: ( + impl Sender, + impl Receiver, + ), + coordinator: Coordinator, + app_mailbox: ingress::Mailbox + ) -> Handle<()> { + self.context.spawn_ref()(self.run(batch_network, backfill_network, coordinator, app_mailbox)) + } + + pub async fn run( + mut self, + mut batch_network: ( + impl Sender, + impl Receiver, + ), + backfill_network: ( + impl Sender, + impl Receiver, + ), + coordinator: Coordinator, + mut app_mailbox: ingress::Mailbox + ) { + let (handler_sender, mut handler_receiver) = mpsc::channel(self.mailbox_size); + let handler = Handler::new(handler_sender); + let (resolver_engine, mut resolver) = p2p::Engine::new( + self.context.with_label("resolver"), + p2p::Config { + coordinator: coordinator, + consumer: handler.clone(), + producer: handler, + mailbox_size: self.mailbox_size, + requester_config: requester::Config { + public_key: self.public_key.clone(), + rate_limit: self.backfill_quota, + initial: Duration::from_secs(1), + timeout: Duration::from_secs(2), + }, + fetch_retry_timeout: Duration::from_millis(100), // prevent busy loop + priority_requests: false, + priority_responses: false, + }, + ); + resolver_engine.start(backfill_network); + + let mut waiters: HashMap>>>> = HashMap::new(); + let mut propose_timeout = self.context.current() + self.batch_propose_interval; + let accepted = Wrapped::new(self.accepted); + let consumed = Wrapped::new(self.consumed); + loop { + // Clear dead waiters + waiters.retain(|_, waiters| { + waiters.retain(|waiter| !waiter.is_canceled()); + !waiters.is_empty() + }); + + select! { + mailbox_message = self.mailbox.next() => { + let Some(message) = mailbox_message else { + info!("Mailbox closed, terminating..."); + return; + }; + match message { + Message::SubmitTx { payload, response } => { + if !payload.validate() { + let _ = response.send(false); + return; + } + + self.txs.push(payload); + let _ = response.send(true); + }, + // batch ackowledged by the network + Message::BatchAcknowledged { digest, response } => { + debug!("batch accepted by the network: {}", digest); + + self.acknowledged.push(digest); + if let Some(batch) = self.batches.get_mut(&digest) { + accepted.put(self.block_height_seen, digest, batch.serialize().into()).await.expect("unable to store accepted batch"); + } else { + panic!("batch not found: {}", digest); + } + let _ = response.send(true); + }, + Message::ConsumeBatches { response } => { + // we do not remove anything from the mempool state as the digests/batches may not be consumed + let batches = self.acknowledged.iter().filter_map(|digest| { + let Some(batch) = self.batches.get(&digest) else { + // shouldn't happen + panic!("batch not found: {}", digest); + }; + Some(batch.clone()) + }).collect(); + let _ = response.send(batches); + }, + // received when a block is finalized, i.e. finalization message is received, + // remove consumed batches from buffer & put them in consumed archive + Message:: BatchConsumed { digests, block_number, response } => { + // update the seen height + self.block_height_seen = block_number; + + let consumed_keys: Vec = self.batches.iter() + .filter_map(|(digest, batch)| { + if digests.contains(&batch.digest) { + Some(digest.clone()) + } else { + None + } + }) + .collect(); + let consumed_keys_len = consumed_keys.len(); + + // not all provided keys are consumed, there must be state corruption, panic immediately + if consumed_keys_len != digests.len() { + panic!("not all provided batch digests consumed, provided={:?}, consumed={:?}", digests, consumed_keys); + } + + // remove digests and batches + let consumed_batches: Vec> = consumed_keys.into_iter() + .filter_map(|key| self.batches.remove(&key)) + .collect(); + + self.acknowledged.retain(|digest| !digests.contains(digest)); + + for batch in consumed_batches.iter() { + consumed.put(block_number, batch.digest, batch.serialize().into()).await.expect("Failed to insert accepted batch"); + } + + + let _ = response.send(true); + }, + // for validators, this should be only called when they receive + // 1. a digest from broadcast primitive + // 2. a block with a list of references of batches + // both of the above should only happen at their verification stage + Message::GetBatch { digest, response } => { + // fetch in buffer, i.e. accepted + if let Some(batch) = self.batches.get(&digest).cloned() { + let _ = response.send(Some(batch)); + continue; + }; + // fetch in consumed + let consumed = consumed.get(Identifier::Key(&digest)) + .await + .expect("Failed to get consumed batch"); + if let Some(consumed) = consumed { + let consumed = Batch::deserialize(&consumed).expect("unable to deserialize batch"); + let _ = response.send(Some(consumed)); + continue; + } + + // not found in the above, request from other peers + resolver.fetch(MultiIndex::new(Value::Digest(digest.into()))).await; + waiters.entry(digest).or_default().push(response); + }, + Message::GetBatchContainTx { digest, response } => { + // TODO: optimize this naive way of seaching + let pair = self.batches.iter().find(|(_, batch)| batch.contain_tx(&digest)); + if let Some((_, batch)) = pair { + let _ = response.send(Some(batch.clone())); + } else { + let _ = response.send(None); + } + }, + Message::GetTx { digest, response } => { + // TODO: optimize this naive way of seaching + let pair = self.batches.iter().find(|(_, batch)| batch.contain_tx(&digest)); + if let Some((_, batch)) = pair { + let _ = response.send(batch.tx(&digest)); + } else { + let _ = response.send(None); + } + }, + } + }, + // propose a batch in a given interval + _ = self.context.sleep_until(propose_timeout) => { + let mut size = 0; + let mut txs_cnt = 0; + for tx in self.txs.iter() { + size += tx.size(); + txs_cnt += 1; + + if size > self.batch_size_limit { + break; + } + } + + if txs_cnt == 0 { + propose_timeout = self.context.current() + self.batch_propose_interval; + continue; + } + + let batch = Batch::new(self.txs.drain(..txs_cnt).collect(), self.context.current()); + self.batches.insert(batch.digest, batch.clone()); + + debug!("broadcasting batch & digest={}, peer={}", batch.digest, self.public_key.clone()); + maybe_delay_between! { + self.context, + app_mailbox.broadcast(batch.digest).await; + batch_network.0.send(Recipients::All, batch.serialize().into(), true).await.expect("failed to broadcast batch"); + } + + // reset the timeout + propose_timeout = self.context.current() + self.batch_propose_interval; + }, + batch_message = batch_network.1.recv() => { + let (sender, message) = batch_message.expect("Batch broadcast closed"); + let Some(batch) = Batch::deserialize(&message) else { + warn!(?sender, "failed to deserialize batch"); + continue; + }; + + debug!(?sender, digest=?batch.digest, receiver=?self.public_key, "received batch"); + let _ = self.batches.entry(batch.digest).or_insert(batch); + }, + + // handle batch request, a validator will issue batches it has + handler_message = handler_receiver.next() => { + let message = handler_message.expect("Handler closed"); + match message { + handler::Message::Produce { key, response } => { + match key.to_value() { + key::Value::Digest(digest) => { + if let Some(batch) = self.batches.get(&D::from(digest)).cloned() { + let _ = response.send(batch.serialize().into()); + continue; + }; + + let consumed = consumed.get(Identifier::Key(&D::from(digest))) + .await + .expect("Failed to get accepted batch"); + if let Some(consumed) = consumed { + let _ = response.send(consumed); + continue; + } + debug!(?digest, "missing batch"); + } + } + }, + handler::Message::Deliver { key, value, response } => { + match key.to_value() { + key::Value::Digest(digest) => { + let batch = Batch::deserialize(&value).expect("Failed to deserialize batch"); + if batch.digest.into() != digest { + let _ = response.send(false); + continue; + } + + if let Some(waiters) = waiters.remove(&batch.digest) { + debug!(?batch.digest, ?self.public_key, "waiters resolved via batch"); + for waiter in waiters { + let _ = waiter.send(Some(batch.clone())); + } + } + + debug!(?batch.digest, "receive a batch from resolver"); + // add received batch to buffer if not exists + self.batches.entry(batch.digest).or_insert(batch); + + let _ = response.send(true); + } + } + }, + } + } + } + } + } +} \ No newline at end of file diff --git a/chain/src/actors/mempool/mod.rs b/chain/src/actors/mempool/mod.rs new file mode 100644 index 00000000..0b73aece --- /dev/null +++ b/chain/src/actors/mempool/mod.rs @@ -0,0 +1,387 @@ +pub mod actor; +pub mod ingress; +pub mod coordinator; +pub mod collector; +pub mod mempool; +pub mod handler; +pub mod key; +pub mod archive; + +#[cfg(test)] +mod tests { + use core::panic; + use std::{collections::{BTreeMap, HashMap}, num::NonZeroU32, sync::{Arc, Mutex}, time::Duration}; + use bytes::Bytes; + use commonware_broadcast::linked::{Config, Engine}; + + use governor::Quota; + use tracing::{debug, info, warn}; + + use commonware_cryptography::{bls12381::{dkg, primitives::{group::Share, poly}}, ed25519::PublicKey, sha256, Ed25519, Scheme}; + use commonware_macros::test_traced; + use commonware_p2p::simulated::{Oracle, Receiver, Sender, Link, Network}; + use commonware_runtime::{deterministic::{Context, Executor}, Clock, Metrics, Runner, Spawner}; + use futures::channel::mpsc; + + use super::{ingress, mempool::{self, Mempool, RawTransaction}}; + + type Registrations

= HashMap, Receiver

), + (Sender

, Receiver

), + (Sender

, Receiver

), + (Sender

, Receiver

) + )>; + + #[allow(dead_code)] + enum Action { + Link(Link), + Update(Link), + Unlink, + } + + async fn register_validators( + oracle: &mut Oracle, + validators: &[PublicKey], + ) -> HashMap, Receiver), + (Sender, Receiver), + (Sender, Receiver), + (Sender, Receiver), + )> { + let mut registrations = HashMap::new(); + for validator in validators.iter() { + let (digest_sender, digest_receiver) = oracle.register(validator.clone(), 4).await.unwrap(); + let (ack_sender, ack_receiver) = oracle.register(validator.clone(), 5).await.unwrap(); + let (batch_sender, batch_receiver) = oracle.register(validator.clone(), 6).await.unwrap(); + let (batch_backfill_sender, batch_backfill_receiver) = oracle.register(validator.clone(), 7).await.unwrap(); + registrations.insert(validator.clone(), ( + (digest_sender, digest_receiver), + (ack_sender, ack_receiver), + (batch_sender, batch_receiver), + (batch_backfill_sender, batch_backfill_receiver), + )); + } + registrations + } + + async fn link_validators( + oracle: &mut Oracle, + validators: &[PublicKey], + link: Link, + restrict_to: Option bool>, + ) { + for (i1, v1) in validators.iter().enumerate() { + for (i2, v2) in validators.iter().enumerate() { + // Ignore self + if v2 == v1 { + continue; + } + + // Restrict to certain connections + if let Some(f) = restrict_to { + if !f(validators.len(), i1, i2) { + continue; + } + } + + // Add link + oracle + .add_link(v1.clone(), v2.clone(), link.clone()) + .await + .unwrap(); + } + } + } + + async fn initialize_simulation( + context: Context, + num_validators: u32, + shares_vec: &mut [Share], + ) -> ( + Oracle, + Vec<(PublicKey, Ed25519, Share)>, + Vec, + Registrations, + ) { + let (network, mut oracle) = Network::new( + context.with_label("network"), + commonware_p2p::simulated::Config { + max_size: 1024 * 1024, + }, + ); + network.start(); + + let mut schemes = (0..num_validators) + .map(|i| Ed25519::from_seed(i as u64)) + .collect::>(); + schemes.sort_by_key(|s| s.public_key()); + let validators: Vec<(PublicKey, Ed25519, Share)> = schemes + .iter() + .enumerate() + .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i])) + .collect(); + let pks = validators + .iter() + .map(|(pk, _, _)| pk.clone()) + .collect::>(); + + let registrations = register_validators(&mut oracle, &pks).await; + let link = Link { + latency: 10.0, + jitter: 1.0, + success_rate: 1.0, + }; + link_validators(&mut oracle, &pks, link, None).await; + (oracle, validators, pks, registrations) + } + + #[allow(clippy::too_many_arguments)] + async fn spawn_validator_engines( + context: Context, + identity: poly::Public, + pks: &[PublicKey], + validators: &[(PublicKey, Ed25519, Share)], + registrations: &mut Registrations, + collectors: &mut BTreeMap>, + refresh_epoch_timeout: Duration, + rebroadcast_timeout: Duration, + ) -> BTreeMap> { + let mut mailboxes = BTreeMap::new(); + let namespace = b"my testing namespace"; + for (validator, scheme, share) in validators.iter() { + let (mempool, mempool_mailbox) = Mempool::init(context.with_label("mempool"), mempool::Config { + batch_propose_interval: Duration::from_millis(500), + batch_size_limit: 1024*1024, + backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()), + mailbox_size: 1024, + public_key: scheme.public_key(), + block_height: 0, + partition_prefix: format!("mempool"), + }).await; + mailboxes.insert(validator.clone(), mempool_mailbox.clone()); + + + let context = context.with_label(&validator.to_string()); + let mut coordinator = super::coordinator::Coordinator::::new( + identity.clone(), + pks.to_vec(), + *share, + ); + coordinator.set_view(111); + + let (app, app_mailbox) = + super::actor::Actor::::new(); + + let collector_mempool_mailbox = mempool_mailbox.clone(); + let (collector, collector_mailbox) = + super::collector::Collector::::new( + namespace, + *poly::public(&identity), + ); + context.with_label("collector").spawn(move |_| collector.run(collector_mempool_mailbox)); + collectors.insert(validator.clone(), collector_mailbox); + + let (engine, mailbox) = Engine::new( + context.with_label("engine"), + Config { + crypto: scheme.clone(), + application: app_mailbox.clone(), + collector: collectors.get(validator).unwrap().clone(), + coordinator: coordinator.clone(), + mailbox_size: 1024, + verify_concurrent: 1024, + namespace: namespace.to_vec(), + epoch_bounds: (1, 1), + height_bound: 2, + refresh_epoch_timeout, + rebroadcast_timeout, + journal_heights_per_section: 10, + journal_replay_concurrency: 1, + journal_name_prefix: format!("broadcast-linked-seq/{}/", validator), + }, + ); + + context.with_label("app").spawn(move |_| app.run(mailbox, mempool_mailbox)); + let ((a1, a2), (b1, b2), (c1, c2), (d1, d2)) = registrations.remove(validator).unwrap(); + engine.start((a1, a2), (b1, b2)); + mempool.start((c1, c2), (d1, d2), coordinator, app_mailbox.clone()); + } + mailboxes + } + + async fn spawn_mempools( + context: Context, + identity: poly::Public, + pks: &[PublicKey], + validators: &[(PublicKey, Ed25519, Share)], + registrations: &mut Registrations, + app_mailbox: &mut ingress::Mailbox, + // mailboxes: &mut BTreeMap>, + ) -> BTreeMap> { + let mut mailboxes= BTreeMap::new(); + for (validator, _, share) in validators.iter() { + let context = context.with_label(&validator.to_string()); + let mut coordinator = super::coordinator::Coordinator::::new( + identity.clone(), + pks.to_vec(), + *share, + ); + coordinator.set_view(111); + + let (mempool, mailbox) = Mempool::init( + context.with_label("mempool"), + mempool::Config { + batch_propose_interval: Duration::from_millis(500), + batch_size_limit: 1024*1024, + backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()), + mailbox_size: 1024, + public_key: validator.clone(), + block_height: 0, + partition_prefix: format!("mempool"), + } + ).await; + mailboxes.insert(validator.clone(), mailbox); + let ((_, _), (_, _), (c1, c2), (d1, d2)) = registrations.remove(validator).unwrap(); + mempool.start((c1, c2), (d1, d2),coordinator, app_mailbox.clone()); + } + + mailboxes + } + + async fn spawn_tx_issuer_and_wait( + context: Context, + mailboxes: Arc>>>, + num_txs: u32, + wait_batch_acknowlegement: bool, + consume_batch: bool, + ) { + context + .clone() + .with_label("tx issuer") + .spawn(move |context| async move { + let mut mailbox_vec: Vec> = { + let guard = mailboxes.lock().unwrap(); + guard.values().cloned().collect() + }; + + if mailbox_vec.len() <= 1 { + panic!("insuffient mempool nodes spawned, have {}", mailbox_vec.len()); + } + + let Some(mut mailbox)= mailbox_vec.pop() else { + panic!("no single mailbox provided"); + }; + + + // issue tx to the first validator + let mut digests = Vec::new(); + for i in 0..num_txs { + let tx = RawTransaction::new(Bytes::from(format!("tx-{}", i))); + let submission_res = mailbox.issue_tx(tx.clone()).await; + if !submission_res { + warn!(?tx.digest, "failed to submit tx"); + continue; + } + debug!("tx submitted: {}", tx.digest); + digests.push(tx.digest); + } + + if digests.len() == 0 { + panic!("zero txs issued"); + } + + context.sleep(Duration::from_secs(5)).await; + + // check if the tx appear in other validators + for mut mailbox in mailbox_vec { + for digest in digests.iter() { + let Some(tx) = mailbox.get_tx(digest.clone()).await else { + panic!("digest: {} not found at mailbox", digest); + }; + + info!("tx found at mempool: {}", tx.digest); + + if wait_batch_acknowlegement { + let Some(batch) = mailbox.get_batch_contain_tx(digest.clone()).await else { + panic!("batch not found"); + }; + info!("batch contain tx {} acknowledged", batch.digest); + } + if consume_batch { + // 1. consume the batches + let batches = mailbox.consume_batches().await; + assert!(batches.len() != 0, "expected some batches to consume"); + info!("consumed batches: {:?}", batches); + // 2. mark the batches as consumed + info!("marking batches as consumed"); + let digests = batches.iter().map(|batch| batch.digest).collect(); + let _ = mailbox.consumed_batches(digests, 0).await; + // 3. try consume batches again + let batches = mailbox.consume_batches().await; + assert!(batches.len() == 0, "expected zero batches to consume"); + info!("zero batches left after consumption"); + } + } + } + }).await.unwrap(); + } + + #[test_traced] + fn test_all_online() { + let num_validators: u32 = 4; + let quorum: u32 = 3; + let (runner, mut context, _) = Executor::timed(Duration::from_secs(30)); + let (identity, mut shares_vec) = dkg::ops::generate_shares(&mut context, None, num_validators, quorum); + shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); + + runner.start(async move { + let (_oracle, validators, pks, mut registrations) = initialize_simulation( + context.with_label("simulation"), + num_validators, + &mut shares_vec).await; + let mut collectors = BTreeMap::>::new(); + let mailboxes = spawn_validator_engines( + context.with_label("validator"), + identity.clone(), + &pks, + &validators, + &mut registrations, + &mut collectors, + Duration::from_millis(100), + Duration::from_secs(5) + ).await; + let guarded_mailboxes = Arc::new(Mutex::new(mailboxes)); + spawn_tx_issuer_and_wait(context.with_label("tx_issuer"), guarded_mailboxes, 1, true, true).await; + }); + } + + #[test_traced] + fn test_mempool_p2p() { + let num_validators: u32 = 4; + let quorum: u32 = 3; + let (runner, mut context, _) = Executor::timed(Duration::from_secs(30)); + let (identity, mut shares_vec) = dkg::ops::generate_shares(&mut context, None, num_validators, quorum); + shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); + + info!("mempool p2p test started"); + let (app_mailbox_sender, _) = mpsc::channel(1024); + let mut app_mailbox = ingress::Mailbox::new(app_mailbox_sender); + + runner.start(async move { + let (_oracle, validators, pks, mut registrations ) = initialize_simulation( + context.with_label("simulation"), + num_validators, + &mut shares_vec).await; + let mailboxes = spawn_mempools( + context.with_label("mempool"), + identity, + &pks, + &validators, + &mut registrations, + &mut app_mailbox, + ).await; + let guarded_mailboxes = Arc::new(Mutex::new(mailboxes)); + spawn_tx_issuer_and_wait(context.with_label("tx_issuer"), guarded_mailboxes, 1, false, false).await; + }); + } +} \ No newline at end of file diff --git a/chain/src/actors/mod.rs b/chain/src/actors/mod.rs index 9e920fb4..3bac1080 100644 --- a/chain/src/actors/mod.rs +++ b/chain/src/actors/mod.rs @@ -1,2 +1,3 @@ pub mod application; pub mod syncer; +pub mod mempool; diff --git a/chain/src/actors/syncer/actor.rs b/chain/src/actors/syncer/actor.rs index 1d64d4fb..6ae3feab 100644 --- a/chain/src/actors/syncer/actor.rs +++ b/chain/src/actors/syncer/actor.rs @@ -287,7 +287,7 @@ impl, I: Index let mut resolver = resolver.clone(); let last_view_processed = last_view_processed.clone(); let verified = verified.clone(); - let notarized = notarized.clone(); + let notarized: Wrapped = notarized.clone(); let finalized = finalized.clone(); let blocks = blocks.clone(); move |_| async move { diff --git a/chain/src/bin/setup.rs b/chain/src/bin/setup.rs index 08c7b403..32c72cd0 100644 --- a/chain/src/bin/setup.rs +++ b/chain/src/bin/setup.rs @@ -8,10 +8,10 @@ use commonware_cryptography::{ ed25519::PublicKey, Ed25519, Scheme, }; -use commonware_deployer::ec2; +use commonware_deployer::ec2::{self}; use commonware_utils::{from_hex_formatted, hex, quorum}; use rand::{rngs::OsRng, seq::IteratorRandom}; -use std::{collections::BTreeMap, fs, ops::AddAssign}; +use std::{collections::BTreeMap, fs, path::Path, ops::AddAssign}; use tracing::{error, info}; use uuid::Uuid; @@ -19,6 +19,7 @@ const BINARY_NAME: &str = "validator"; const PORT: u16 = 4545; const STORAGE_CLASS: &str = "gp3"; const DASHBOARD_FILE: &str = "dashboard.json"; +const MONITORING_PORT: u16 = 9000; fn main() { // Initialize logger @@ -110,6 +111,65 @@ fn main() { .value_parser(value_parser!(String)), ), ) + .subcommand( + Command::new("generate-local") + .about("Generate configuration files for an alto chain locally") + .arg( + Arg::new("storage_dir") + .long("storage-dir") + .required(false) + .default_value("/tmp/alto") + .value_parser(value_parser!(String)) + ) + .arg( + Arg::new("peers") + .long("peers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("bootstrappers") + .long("bootstrappers") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("worker_threads") + .long("worker-threads") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("log_level") + .long("log-level") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("message_backlog") + .long("message-backlog") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("mailbox_size") + .long("mailbox-size") + .required(true) + .value_parser(value_parser!(usize)), + ) + .arg( + Arg::new("dashboard") + .long("dashboard") + .required(true) + .value_parser(value_parser!(String)), + ) + .arg( + Arg::new("output") + .long("output") + .required(true) + .value_parser(value_parser!(String)), + ), + ) .subcommand( Command::new("indexer") .about("Add indexer support for an alto chain.") @@ -155,6 +215,7 @@ fn main() { // Handle subcommands match matches.subcommand() { Some(("generate", sub_matches)) => generate(sub_matches), + Some(("generate-local", sub_matches)) => generate_local(sub_matches), Some(("indexer", sub_matches)) => indexer(sub_matches), Some(("explorer", sub_matches)) => explorer(sub_matches), _ => { @@ -257,6 +318,8 @@ fn generate(sub_matches: &ArgMatches) { worker_threads, log_level: log_level.clone(), + metrics_port: 9090, + allowed_peers: allowed_peers.clone(), bootstrappers: bootstrappers.clone(), @@ -318,6 +381,160 @@ fn generate(sub_matches: &ArgMatches) { info!(path = "config.yaml", "wrote configuration file"); } +fn generate_local(sub_matches: &ArgMatches) { + // Extract arguments + let storage_dir = sub_matches.get_one::("storage_dir").unwrap().clone(); + let peers = *sub_matches.get_one::("peers").unwrap(); + let bootstrappers = *sub_matches.get_one::("bootstrappers").unwrap(); + + let worker_threads = *sub_matches.get_one::("worker_threads").unwrap(); + let log_level = sub_matches.get_one::("log_level").unwrap().clone(); + let message_backlog = *sub_matches.get_one::("message_backlog").unwrap(); + let mailbox_size = *sub_matches.get_one::("mailbox_size").unwrap(); + let dashboard = sub_matches.get_one::("dashboard").unwrap().clone(); + let output = sub_matches.get_one::("output").unwrap().clone(); + + // Construct output path + let raw_current_dir = std::env::current_dir().unwrap(); + let current_dir = raw_current_dir.to_str().unwrap(); + let output = format!("{}/{}", current_dir, output); + + // Check if output directory exists + if fs::metadata(&output).is_ok() { + error!("output directory already exists: {}", output); + std::process::exit(1); + } + + // Generate UUID + let tag = Uuid::new_v4().to_string(); + info!(tag, "generated deployment tag"); + + // Generate peers + assert!( + bootstrappers <= peers, + "bootstrappers must be less than or equal to peers" + ); + let mut peer_schemes = (0..peers) + .map(|_| Ed25519::new(&mut OsRng)) + .collect::>(); + peer_schemes.sort_by_key(|scheme| scheme.public_key()); + let allowed_peers: Vec = peer_schemes + .iter() + .map(|scheme| scheme.public_key().to_string()) + .collect(); + let bootstrappers = allowed_peers + .iter() + .choose_multiple(&mut OsRng, bootstrappers) + .into_iter() + .cloned() + .collect::>(); + + // Generate consensus key + let peers_u32 = peers as u32; + let threshold = quorum(peers_u32).expect("unable to derive quorum"); + let (identity, shares) = ops::generate_shares(&mut OsRng, None, peers_u32, threshold); + info!( + identity = hex(&poly::public(&identity).serialize()), + "generated network key" + ); + + // Generate instance configurations + let mut peer_configs = Vec::new(); + let mut instance_configs = Vec::new(); + let mut peers = Vec::new(); + + for (index, scheme) in peer_schemes.iter().enumerate() { + // Create peer config + let name = format!("validator{}", index); + let peer_config_file = format!("{}.yaml", name); + + let peer_directory = Path::new(&storage_dir).join(format!("validator{}", index)); + let peer_config = Config { private_key: scheme.private_key().to_string(), + share: hex(&shares[index].serialize()), + identity: hex(&identity.serialize()), + + port: PORT + index as u16, + directory: peer_directory.to_string_lossy().into_owned(), + worker_threads, + log_level: log_level.clone(), + + metrics_port: 9090 + index as u16, + + allowed_peers: allowed_peers.clone(), + bootstrappers: bootstrappers.clone(), + + message_backlog, + mailbox_size, + + indexer: None, + }; + peer_configs.push((peer_config_file.clone(), peer_config)); + + // Create instance config + let region = "local".to_string(); + let instance = ec2::InstanceConfig { + name: scheme.public_key().to_string(), + region, + instance_type: "local".to_string(), + storage_size: 10, + storage_class: "local".to_string(), + binary: BINARY_NAME.to_string(), + config: peer_config_file, + }; + instance_configs.push(instance); + + // Create peer config + let peer = ec2::Peer { + name: scheme.public_key().to_string(), + region: "local".to_string(), + ip: "127.0.0.1".parse().expect("invalid IP address"), + }; + peers.push(peer); + } + + // Generate root config file + let config = ec2::Config { + tag, + instances: instance_configs, + monitoring: ec2::MonitoringConfig { + instance_type: "local".to_string(), + storage_size: 10, + storage_class: "local".to_string(), + dashboard: "dashboard.json".to_string(), + }, + ports: vec![ec2::PortConfig { + protocol: "tcp".to_string(), + port: MONITORING_PORT, + cidr: "0.0.0.0/0".to_string(), + }], + }; + + // Write configuration files + fs::create_dir_all(&output).unwrap(); + fs::copy( + format!("{}/{}", current_dir, dashboard), + format!("{}/dashboard.json", output), + ) + .unwrap(); + for (peer_config_file, peer_config) in peer_configs { + let path = format!("{}/{}", output, peer_config_file); + let file = fs::File::create(&path).unwrap(); + serde_yaml::to_writer(file, &peer_config).unwrap(); + info!(path = peer_config_file, "wrote peer configuration file"); + } + + let path = format!("{}/config.yaml", output); + let file = fs::File::create(&path).unwrap(); + serde_yaml::to_writer(file, &config).unwrap(); + info!(path = "config.yaml", "wrote configuration file"); + + let peers_path = format!("{}/peers.yaml", output); + let file = fs::File::create(peers_path).unwrap(); + serde_yaml::to_writer(file, &ec2::Peers{peers}).unwrap(); + info!(path = "peers.yaml", "wrote peers configuration file"); +} + + fn indexer(sub_matches: &ArgMatches) { // Extract arguments let count = *sub_matches.get_one::("count").unwrap(); diff --git a/chain/src/bin/validator.rs b/chain/src/bin/validator.rs index 62ce4548..d1d730c4 100644 --- a/chain/src/bin/validator.rs +++ b/chain/src/bin/validator.rs @@ -1,15 +1,14 @@ -use alto_chain::{engine, Config}; +use alto_chain::{actors::mempool::{self, mempool::Mempool}, engine, Config}; use alto_client::Client; use alto_types::P2P_NAMESPACE; use axum::{routing::get, serve, Extension, Router}; use clap::{Arg, Command}; +use commonware_broadcast::linked; use commonware_cryptography::{ bls12381::primitives::{ group::{self, Element}, poly, - }, - ed25519::{PrivateKey, PublicKey}, - Ed25519, Scheme, + }, ed25519::{PrivateKey, PublicKey}, sha256, Ed25519, Scheme }; use commonware_deployer::ec2::Peers; use commonware_p2p::authenticated; @@ -31,12 +30,17 @@ use sysinfo::{Disks, System}; use tracing::{error, info, Level}; const SYSTEM_METRICS_REFRESH: Duration = Duration::from_secs(5); -const METRICS_PORT: u16 = 9090; +// const METRICS_PORT: u16 = 9090; const VOTER_CHANNEL: u32 = 0; const RESOLVER_CHANNEL: u32 = 1; const BROADCASTER_CHANNEL: u32 = 2; const BACKFILLER_CHANNEL: u32 = 3; +const MEMPOOL_DIGEST_CHANNEL: u32 = 4; +const MEMPOOL_ACK_CHANNEL: u32 = 5; +const MEMPOOL_BATCH_CHANNEL: u32 = 6; +const MMEPOOL_BACKFILL_CHANNEL: u32 = 7; + const LEADER_TIMEOUT: Duration = Duration::from_secs(1); const NOTARIZATION_TIMEOUT: Duration = Duration::from_secs(2); @@ -99,6 +103,7 @@ fn main() { let identity = poly::Public::deserialize(&identity, threshold).expect("Identity is invalid"); let identity_public = *poly::public(&identity); let public_key = signer.public_key(); + let metrics_port = config.metrics_port; let ip = peers.get(&public_key).expect("Could not find self in IPs"); info!( ?public_key, @@ -190,6 +195,41 @@ fn main() { Some(3), ); + // Register mempool broadcast channel + let mempool_limit = Quota::per_second(NonZeroU32::new(8).unwrap()); + let mempool_broadcaster = network.register( + MEMPOOL_DIGEST_CHANNEL, + mempool_limit, + config.message_backlog, + Some(3), + ); + + let mempool_ack_limit = Quota::per_second(NonZeroU32::new(8).unwrap()); + let mempool_ack_broadcaster = network.register( + MEMPOOL_ACK_CHANNEL, + mempool_ack_limit, + config.message_backlog, + Some(3), + ); + + let mempool_batch_limit = Quota::per_second(NonZeroU32::new(8).unwrap()); + let mempool_batch_broadcaster = network.register( + MEMPOOL_BATCH_CHANNEL, + mempool_batch_limit, + config.message_backlog, + Some(3), + ); + + + let mempool_backfill_limit = Quota::per_second(NonZeroU32::new(8).unwrap()); + let mempool_backfill_broadcaster = network.register( + MMEPOOL_BACKFILL_CHANNEL, + mempool_backfill_limit, + config.message_backlog, + Some(3), + ); + + // Create network let p2p = network.start(); @@ -199,6 +239,46 @@ fn main() { indexer = Some(Client::new(&uri, identity_public.into())); } + // Create mempool/broadcast/Proof of Availability engine + let mempool_namespace = b"mempool"; + let (mempool_application, mempool_app_mailbox) = mempool::actor::Actor::::new(); + let broadcast_coordinator = mempool::coordinator::Coordinator::new(identity.clone(), peer_keys.clone(), share); + let (_, collector_mailbox) = mempool::collector::Collector::::new(mempool_namespace, identity_public); + let (broadcast_engine, broadcast_mailbox) = linked::Engine::new(context.with_label("broadcast_engine"), linked::Config { + crypto: signer.clone(), + coordinator: broadcast_coordinator.clone(), + application: mempool_app_mailbox.clone(), + collector: collector_mailbox, + mailbox_size: 1024, + verify_concurrent: 1024, + namespace: mempool_namespace.to_vec(), + refresh_epoch_timeout: Duration::from_millis(100), + rebroadcast_timeout: Duration::from_secs(5), + epoch_bounds: (1,1), + height_bound: 2, + journal_name_prefix: format!("broadcast-linked-seq/{}/", public_key), + journal_heights_per_section: 10, + journal_replay_concurrency: 1 + }); + + let (mempool, mempool_mailbox) = Mempool::init( + context.with_label("mempoool"), + mempool::mempool::Config { + batch_propose_interval: Duration::from_millis(500), + batch_size_limit: 1024*1024, + backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()), + mailbox_size: 1024, + public_key: public_key, + block_height: 0, + partition_prefix: format!("mempool"), + } + ).await; + + let broadcast_engine = broadcast_engine.start(mempool_broadcaster, mempool_ack_broadcaster); + + let mempool_handler = mempool.start(mempool_batch_broadcaster, mempool_backfill_broadcaster, broadcast_coordinator, mempool_app_mailbox); + let mempool_broadcast_app_handler = context.with_label("mempool_app").spawn(|_| mempool_application.run(broadcast_mailbox, mempool_mailbox)); + // Create engine let config = engine::Config { partition_prefix: "engine".to_string(), @@ -220,6 +300,7 @@ fn main() { fetch_rate_per_peer: resolver_limit, indexer, }; + let engine = engine::Engine::new(context.with_label("engine"), config).await; // Start engine @@ -278,8 +359,8 @@ fn main() { }); // Serve metrics - let metrics = context.with_label("metrics").spawn(|context| async move { - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), METRICS_PORT); + let metrics = context.with_label("metrics").spawn(move |context| async move { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), metrics_port); let listener = context .bind(addr) .await @@ -296,7 +377,7 @@ fn main() { }); // Wait for any task to error - if let Err(e) = try_join_all(vec![p2p, engine, system, metrics]).await { + if let Err(e) = try_join_all(vec![p2p, engine, broadcast_engine, system, metrics, mempool_handler, mempool_broadcast_app_handler]).await { error!(?e, "task failed"); } }); diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 4e5bc2dc..02d3a20b 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; pub mod actors; pub mod engine; +pub mod macros; /// Trait for interacting with an indexer. pub trait Indexer: Clone + Send + Sync + 'static { @@ -69,6 +70,8 @@ pub struct Config { pub worker_threads: usize, pub log_level: String, + pub metrics_port: u16, + pub allowed_peers: Vec, pub bootstrappers: Vec, diff --git a/chain/src/macros/mod.rs b/chain/src/macros/mod.rs new file mode 100644 index 00000000..a7e8e3e6 --- /dev/null +++ b/chain/src/macros/mod.rs @@ -0,0 +1,45 @@ +/// A macro that conditionally inserts a random delay between two asynchronous expressions during tests. +/// +/// When compiled in test mode (`#[cfg(test)]`), a random delay is inserted using the provided context's +/// `sleep` method. In non-test builds, the expressions are executed sequentially without delay. +/// The macro discards the return values of the expressions so that it always returns `()`. +/// +/// # Example +/// +/// ```rust +/// // Assume that `self.context` is defined and provides an async sleep method, +/// // and that `batch_network` and `app_mailbox` are in scope. +/// maybe_delay_between!(self.context, +/// app_mailbox.broadcast(batch.digest).await; +/// batch_network.0.send(Recipients::All, batch.serialize().into(), true) +/// .await.expect("failed to broadcast batch") +/// ); +/// ``` +/// +/// You can also include an optional trailing semicolon after the second expression: +/// +/// ```rust +/// maybe_delay_between!(self.context, +/// app_mailbox.broadcast(batch.digest).await; +/// batch_network.0.send(Recipients::All, batch.serialize().into(), true) +/// .await.expect("failed to broadcast batch"); +/// ); +/// ``` +#[macro_export] +macro_rules! maybe_delay_between { + ($context:expr, $first:expr; $second:expr $(;)?) => {{ + // Execute the first asynchronous expression and discard its value. + let _ = $first; + // If we're in a test build, add a random delay using the provided context. + #[cfg(test)] + { + use rand::Rng; + let delay_ms = rand::thread_rng().gen_range(50..150); // random delay between 50 and 150 ms + $context.sleep(std::time::Duration::from_millis(delay_ms)).await; + } + // Execute the second asynchronous expression and discard its value. + let _ = $second; + // Return unit. + () + }}; +} diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 00000000..216f62a3 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +CWD=$(pwd) + +cd chain && cargo build --bin validator --target x86_64-unknown-linux-gnu --target-dir ../targets +cp ../targets/x86_64-unknown-linux-gnu/debug/validator $CWD \ No newline at end of file diff --git a/scripts/start_validators.sh b/scripts/start_validators.sh new file mode 100755 index 00000000..9bf88029 --- /dev/null +++ b/scripts/start_validators.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +cd ./chain + +NUM_VALIDATORS=${NUM_VALIDATORS:-5} +LOG_DIR=/tmp/alto-logs +PID_FILE=${LOG_DIR}/validators.pid +FLAG_FILE=${LOG_DIR}/network_running.flag + +# Check if a network is already running +if [ -f ${FLAG_FILE} ]; then + echo "Existing network detected (flag file ${FLAG_FILE} exists). Aborting launch." + exit 1 +fi + +# Create log directory if it doesn't exist and clear any old PID file +mkdir -p ${LOG_DIR} +[ -f ${PID_FILE} ] && rm ${PID_FILE} + +# Create the flag file to indicate that the network is running +touch ${FLAG_FILE} + +for i in $(seq 0 $(($NUM_VALIDATORS - 1))); do + echo "Launching validator $i..." + cargo run --bin validator -- --peers assets/peers.yaml --config assets/validator${i}.yaml > ${LOG_DIR}/validator${i}.log 2>&1 & + echo $! >> ${PID_FILE} +done + +echo "Validators launched. PIDs stored in ${PID_FILE}." diff --git a/scripts/stop_validators.sh b/scripts/stop_validators.sh new file mode 100755 index 00000000..68dfa408 --- /dev/null +++ b/scripts/stop_validators.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +LOG_DIR=/tmp/alto-logs +PID_FILE=${LOG_DIR}/validators.pid +FLAG_FILE=${LOG_DIR}/network_running.flag + +if [ ! -f ${PID_FILE} ]; then + echo "No validators are running (PID file not found)." + exit 1 +fi + +while read pid; do + echo "Stopping validator process with PID ${pid}..." + kill ${pid} 2>/dev/null +done < ${PID_FILE} + +rm ${PID_FILE} +rm -f ${FLAG_FILE} +echo "All validators have been stopped and the network flag has been removed."