diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index 7fdaadf..0b91add 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -2,12 +2,12 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort}; +use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort, Message}; use pallas::network::miniprotocols::peersharing::PeerAddress; use rand::seq::{IndexedMutRandom, IndexedRandom}; use serde::Deserialize; use thiserror::Error; -use tokio::sync::RwLock; +use tokio::sync::{broadcast::Sender, RwLock}; use tokio::time::timeout; use tracing::{error, info, warn}; @@ -25,14 +25,14 @@ pub enum PeerManagerError { pub struct PeerManager { network_magic: u64, peers: RwLock>>, - receiver: ChannelRecvAdapter>, + sender: Sender>>, } impl PeerManager { pub fn new( network_magic: u64, peer_addresses: Vec, - receiver: ChannelRecvAdapter>, + sender: Sender>>, ) -> Self { let peers = peer_addresses .into_iter() @@ -42,7 +42,7 @@ impl PeerManager { Self { network_magic, peers: RwLock::new(peers), - receiver, + sender, } } @@ -52,7 +52,9 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); let mut input = InputPort::>::default(); - input.connect(self.receiver.clone()); + let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); + + input.connect(receiver); new_peer.input = Arc::new(RwLock::new(input)); new_peer.is_peer_sharing_enabled = new_peer @@ -124,7 +126,9 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); let mut input = InputPort::>::default(); - input.connect(self.receiver.clone()); + let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); + + input.connect(receiver); new_peer.input = Arc::new(RwLock::new(input)); let timeout_duration = Duration::from_secs(5); diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 51ffc39..93061d9 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use gasket::messaging::tokio::ChannelSendAdapter; use crate::{ ledger::{ @@ -31,10 +32,15 @@ pub async fn run( let relay_adapter: Arc = Arc::new(MockRelayDataAdapter::new()); - let (sender, receiver) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); + let (sender, _) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); + + let broadcast_sender = match &sender { + ChannelSendAdapter::Broadcast(sender) => sender.clone(), + _ => panic!("Expected broadcast sender"), + }; let peer_addrs = config.peer_manager.peers.clone(); - let peer_manager = PeerManager::new(2, peer_addrs, receiver); + let peer_manager = PeerManager::new(2, peer_addrs, broadcast_sender); peer_manager.init().await?;