Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain-signatures/contract-sol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub struct SignatureRespondedEvent {
}

#[event]
#[derive(Clone)]
pub struct RespondBidirectionalEvent {
pub request_id: [u8; 32],
pub responder: Pubkey,
Expand Down
14 changes: 12 additions & 2 deletions chain-signatures/node/src/protocol/message/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,18 @@ impl<T> Subscriber<T> {

pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError<T>> {
match self {
Self::Subscribed(tx) => tx.send(msg).await,
Self::Unsubscribed(tx, _) => tx.send(msg).await,
Self::Subscribed(tx) => {
let cap = tx.capacity();
let max_cap = tx.max_capacity();
tracing::warn!("Sending to subscribed, capacity {cap}/{max_cap}");
tx.send(msg).await
}
Self::Unsubscribed(tx, _) => {
let cap = tx.capacity();
let max_cap = tx.max_capacity();
tracing::warn!("Sending to unsubscribed, capacity {cap}/{max_cap}");
tx.send(msg).await
}
Self::Unknown => Ok(()),
}
}
Expand Down
11 changes: 1 addition & 10 deletions chain-signatures/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,6 @@ use signet_program::accounts::Respond as SolanaRespondAccount;
use signet_program::accounts::RespondBidirectional as SolanaRespondBidirectionalAccount;
use signet_program::instruction::Respond as SolanaRespond;
use signet_program::instruction::RespondBidirectional as SolanaRespondBidirectional;
use signet_program::AffinePoint as SolanaContractAffinePoint;
use signet_program::Signature as SolanaContractSignature;
use solana_sdk::signature::Signer as SolanaSigner;
async fn try_publish_sol(
sol: &SolanaClient,
Expand All @@ -1614,14 +1612,7 @@ async fn try_publish_sol(
let sign_id = action.indexed.id;
let request_ids = vec![action.indexed.id.request_id];
let big_r = signature.big_r.to_encoded_point(false);
let signature = SolanaContractSignature {
big_r: SolanaContractAffinePoint {
x: big_r.as_bytes()[1..33].try_into().unwrap(),
y: big_r.as_bytes()[33..65].try_into().unwrap(),
},
s: signature.s.to_bytes().into(),
recovery_id: signature.recovery_id,
};
let signature = crate::util::mpc_to_sol_signature(signature, big_r);

tracing::debug!(
?sign_id,
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn channel() -> (mpsc::Sender<ChainEvent>, mpsc::Receiver<ChainEvent>) {

/// Unified event produced by a chain stream
#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
pub enum ChainEvent {
SignRequest(IndexedSignRequest),
Respond(SignatureRespondedEvent),
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/src/stream/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl SignBidirectionalEvent {
}
}

#[derive(Clone)]
pub enum RespondBidirectionalEvent {
Solana(signet_program::RespondBidirectionalEvent),
Hydration(HydrationRespondBidirectionalEvent),
Expand Down
14 changes: 14 additions & 0 deletions chain-signatures/node/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ impl AffinePointExt for AffinePoint {
}
}

pub fn mpc_to_sol_signature(
signature: &mpc_primitives::Signature,
big_r: k256::EncodedPoint,
) -> signet_program::Signature {
signet_program::Signature {
big_r: signet_program::AffinePoint {
x: big_r.as_bytes()[1..33].try_into().unwrap(),
y: big_r.as_bytes()[33..65].try_into().unwrap(),
},
s: signature.s.to_bytes().into(),
recovery_id: signature.recovery_id,
}
}

pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool {
if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) {
let timeout = Duration::from_millis(timeout);
Expand Down
30 changes: 28 additions & 2 deletions integration-tests/src/mpc_fixture/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::mpc_fixture::fixture_tasks::MessageFilter;
use crate::mpc_fixture::input::FixtureInput;
use crate::mpc_fixture::message_collector::CollectMessages;
use crate::mpc_fixture::mock_governance::MockGovernance;
use crate::mpc_fixture::mock_stream::MockStream;
use crate::mpc_fixture::{fixture_tasks, MpcFixture, MpcFixtureNode};
use cait_sith::protocol::Participant;
use mpc_contract::config::{
Expand Down Expand Up @@ -54,6 +55,7 @@ struct MpcFixtureNodeBuilder {
config: Config,
messaging: NodeMessagingBuilder,
key_info: Option<NodeKeyInfo>,
mock_streams: Vec<MockStream>,
}

/// Config options for the test setup.
Expand Down Expand Up @@ -388,6 +390,17 @@ impl MpcFixtureBuilder {
.with_node_min_triples(0)
.with_node_min_presignatures(0)
}

/// Add a mock stream to all nodes.
///
/// Each node will have a independent deep-clone of the provided stream.
/// Events are thus delivered to all nodes.
pub async fn with_mock_stream(mut self, stream: MockStream) -> Self {
for node in &mut self.prepared_nodes {
node.mock_streams.push(stream.deep_clone().await)
}
self
}
}

impl MpcFixtureNodeBuilder {
Expand Down Expand Up @@ -435,6 +448,7 @@ impl MpcFixtureNodeBuilder {
config,
messaging,
key_info: None,
mock_streams: vec![],
}
}

Expand Down Expand Up @@ -496,10 +510,20 @@ impl MpcFixtureNodeBuilder {
me: account_id.clone(),
protocol_state_tx,
},
context.contract_state,
context.contract_state.clone(),
mesh_rx.clone(),
));

let backlog = Backlog::new();

fixture_tasks::start_mock_stream_tasks(
&self.mock_streams,
sign_tx.clone(),
backlog.clone(),
context.contract_state,
&mesh_rx,
);

// handle outbox messages manually, we want them before they are
// encrypted and we want to send them directly to other node's inboxes
let _mock_network_handle = fixture_tasks::test_mock_network(
Expand All @@ -510,6 +534,7 @@ impl MpcFixtureNodeBuilder {
mesh_tx.clone(),
config_tx.clone(),
self.messaging.filter,
self.mock_streams.clone(),
);

let mut node = MpcFixtureNode {
Expand All @@ -519,9 +544,10 @@ impl MpcFixtureNodeBuilder {
config: config_tx,
sign_tx,
msg_channel: self.messaging.channel,
mock_streams: self.mock_streams,
triple_storage,
presignature_storage,
backlog: Backlog::new(),
backlog,
web_handle: None,
};

Expand Down
4 changes: 3 additions & 1 deletion integration-tests/src/mpc_fixture/fixture_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::containers::Redis;
use crate::mpc_fixture::message_collector::{CollectMessages, MessagePrinter};
use crate::mpc_fixture::mock_stream::MockStream;
use cait_sith::protocol::Participant;
use mpc_node::backlog::Backlog;
use mpc_node::config::Config;
Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct MpcFixtureNode {

pub sign_tx: mpsc::Sender<Sign>,
pub msg_channel: MessageChannel,
pub mock_streams: Vec<MockStream>,

pub triple_storage: TripleStorage,
pub presignature_storage: PresignatureStorage,
Expand Down Expand Up @@ -124,7 +126,7 @@ impl MpcFixture {
let actions: tokio::sync::MutexGuard<'_, HashSet<String>> =
self.output.rpc_actions.lock().await;

tracing::info!("All published RPC actions:");
tracing::info!(count = actions.len(), "All published RPC actions:");
for action in actions.iter() {
tracing::info!("{action}");
}
Expand Down
37 changes: 34 additions & 3 deletions integration-tests/src/mpc_fixture/fixture_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@
//! passing between nodes and updates to the governance smart contract.

use crate::mpc_fixture::fixture_interface::SharedOutput;
use crate::mpc_fixture::mock_stream::MockStream;
use cait_sith::protocol::Participant;
use mpc_keys::hpke::Ciphered;
use mpc_node::backlog::Backlog;
use mpc_node::config::Config;
use mpc_node::mesh::MeshState;
use mpc_node::node_client::NodeClient;
use mpc_node::protocol::message::{MessageOutbox, SendMessage, SignedMessage};
use mpc_node::rpc::RpcAction;
use mpc_node::protocol::Sign;
use mpc_node::rpc::{ContractStateWatcher, RpcAction};
use mpc_node::stream::run_stream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::watch;
use tokio::task::JoinHandle;

pub type MessageFilter = Box<dyn FnMut(&SendMessage) -> bool + Send>;

#[allow(clippy::too_many_arguments)]
pub(super) fn test_mock_network(
routing_table: HashMap<Participant, Sender<Ciphered>>,
shared_output: &SharedOutput,
Expand All @@ -24,6 +30,7 @@ pub(super) fn test_mock_network(
mesh: watch::Sender<MeshState>,
config: watch::Sender<Config>,
mut filter: MessageFilter,
mock_streams: Vec<MockStream>,
) -> JoinHandle<()> {
let msg_log = Arc::clone(&shared_output.msg_log);
let rpc_actions = Arc::clone(&shared_output.rpc_actions);
Expand Down Expand Up @@ -67,7 +74,7 @@ pub(super) fn test_mock_network(
}

Some(rpc) = rpc_rx.recv() => {
let action_str = match rpc {
let action_str = match &rpc {
RpcAction::Publish(publish_action) => {
format!(
"RpcAction::Publish({:?})",
Expand All @@ -78,6 +85,10 @@ pub(super) fn test_mock_network(
tracing::info!(target: "mock_network", ?action_str, "Received RPC action");
let mut actions_log = rpc_actions.lock().await;
actions_log.insert(action_str);
let block = [rpc];
for stream in &mock_streams {
stream.rpc_actions(&block).await;
}
}

else => {
Expand All @@ -89,3 +100,23 @@ pub(super) fn test_mock_network(
tracing::info!(target: "mock_network", "Test mock network task exited");
})
}

pub(super) fn start_mock_stream_tasks(
mock_streams: &[MockStream],
sign_tx: mpsc::Sender<Sign>,
backlog: Backlog,
contract_watcher: ContractStateWatcher,
mesh_state: &watch::Receiver<MeshState>,
) {
for stream in mock_streams {
tokio::spawn(run_stream(
stream.clone(),
sign_tx.clone(),
backlog.clone(),
contract_watcher.clone(),
mesh_state.clone(),
// Only used for backlog recovery - not implemented in component tests yet
NodeClient::new(&Default::default()),
));
}
}
Loading