From ae88b01b7ab3d2422c722cba732171b9c16614de Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Sat, 15 Mar 2025 06:12:54 +0800 Subject: [PATCH 1/5] feat: adjust transaction fetching capacity based on queued length --- src/pipeline/ingest.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index 2eb46a7..0072a8e 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -142,9 +142,16 @@ impl gasket::framework::Worker for Worker { &mut self, stage: &mut Stage, ) -> Result>, WorkerError> { + let queued_len = stage.output.queued_len().unwrap_or(0); + let current_cap = CAP - queued_len as u16; + + if current_cap == 0 { + return Ok(WorkSchedule::Idle); + } + let transactions = stage .priority - .next(TransactionStatus::Pending, CAP) + .next(TransactionStatus::Pending, current_cap) .await .or_retry()?; From ef69de9b1e5a5b7ce15a93d04d95c766c68abeb7 Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Tue, 18 Mar 2025 03:05:32 +0800 Subject: [PATCH 2/5] fix: undequeued broadcast channel messages because of peer_manager receiver reference --- src/network/peer_manager.rs | 36 ++++++++++++++++++++++++++++++++---- src/pipeline/ingest.rs | 2 +- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index 7fdaadf..ed76148 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -2,12 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use gasket::messaging::RecvAdapter; use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort}; use pallas::network::miniprotocols::peersharing::PeerAddress; use rand::seq::{IndexedMutRandom, IndexedRandom}; use serde::Deserialize; use thiserror::Error; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tokio::time::timeout; use tracing::{error, info, warn}; @@ -25,7 +26,7 @@ pub enum PeerManagerError { pub struct PeerManager { network_magic: u64, peers: RwLock>>, - receiver: ChannelRecvAdapter>, + receiver: Arc>>>, } impl PeerManager { @@ -39,6 +40,8 @@ impl PeerManager { .map(|peer_addr| (peer_addr, None)) .collect(); + let receiver = Arc::new(Mutex::new(receiver)); + Self { network_magic, peers: RwLock::new(peers), @@ -48,13 +51,18 @@ impl PeerManager { pub async fn init(&self) -> Result<(), PeerManagerError> { let mut peers = self.peers.write().await; + for (peer_addr, peer) in peers.iter_mut() { let mut new_peer = Peer::new(peer_addr, self.network_magic); + let peer_rx_guard = self.receiver.lock().await; + let mut input = InputPort::>::default(); - input.connect(self.receiver.clone()); + input.connect((*peer_rx_guard).clone()); new_peer.input = Arc::new(RwLock::new(input)); + drop(peer_rx_guard); + new_peer.is_peer_sharing_enabled = new_peer .query_peer_sharing_mode() .await @@ -68,9 +76,25 @@ impl PeerManager { *peer = Some(new_peer); } + self.start_recv_drain().await; Ok(()) } + async fn start_recv_drain(&self) { + let receiver = Arc::clone(&self.receiver); + let timeout_duration = Duration::from_millis(500); + + tokio::spawn(async move { + loop { + let _ = timeout(timeout_duration, async { + let mut rx_guard = receiver.lock().await; + rx_guard.recv().await + }) + .await; + } + }); + } + pub async fn pick_peer_rand( &self, peers_per_request: u8, @@ -123,10 +147,14 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); + let peer_rx_guard = self.receiver.lock().await; + let mut input = InputPort::>::default(); - input.connect(self.receiver.clone()); + input.connect((*peer_rx_guard).clone()); new_peer.input = Arc::new(RwLock::new(input)); + drop(peer_rx_guard); + let timeout_duration = Duration::from_secs(5); match timeout(timeout_duration, new_peer.init()).await { diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index 0072a8e..c725a7e 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -142,7 +142,7 @@ impl gasket::framework::Worker for Worker { &mut self, stage: &mut Stage, ) -> Result>, WorkerError> { - let queued_len = stage.output.queued_len().unwrap_or(0); + let queued_len = stage.output.len(); let current_cap = CAP - queued_len as u16; if current_cap == 0 { From 47db8f96d068740b7954254c2aa77e0f58b7a589 Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Tue, 18 Mar 2025 03:11:38 +0800 Subject: [PATCH 3/5] refactor: rename start_recv_drain to start_recv_task for clarity --- src/network/peer_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index ed76148..031666d 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -76,11 +76,11 @@ impl PeerManager { *peer = Some(new_peer); } - self.start_recv_drain().await; + self.start_recv_task().await; Ok(()) } - async fn start_recv_drain(&self) { + async fn start_recv_task(&self) { let receiver = Arc::clone(&self.receiver); let timeout_duration = Duration::from_millis(500); From 9e73546ded35b8dda64432390fafc8ffcf5bcf8d Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Wed, 19 Mar 2025 19:47:14 +0800 Subject: [PATCH 4/5] refactor: update PeerManager to use broadcast sender's subscribe() --- src/network/peer_manager.rs | 47 ++++++++++--------------------------- src/pipeline/ingest.rs | 2 +- src/pipeline/mod.rs | 10 ++++++-- 3 files changed, 21 insertions(+), 38 deletions(-) diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index 031666d..fdd7ec1 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -2,14 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use gasket::messaging::RecvAdapter; -use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort}; +use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort, Message}; use pallas::network::miniprotocols::peersharing::PeerAddress; use rand::seq::{IndexedMutRandom, IndexedRandom}; use serde::Deserialize; use thiserror::Error; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::timeout; +use tokio::sync::RwLock; +use tokio::{sync::broadcast::Sender, time::timeout}; use tracing::{error, info, warn}; use super::peer::{Peer, PeerError}; @@ -26,26 +25,24 @@ pub enum PeerManagerError { pub struct PeerManager { network_magic: u64, peers: RwLock>>, - receiver: Arc>>>, + sender: Sender>>, } impl PeerManager { pub fn new( network_magic: u64, peer_addresses: Vec, - receiver: ChannelRecvAdapter>, + sender: Sender>>, ) -> Self { let peers = peer_addresses .into_iter() .map(|peer_addr| (peer_addr, None)) .collect(); - let receiver = Arc::new(Mutex::new(receiver)); - Self { network_magic, peers: RwLock::new(peers), - receiver, + sender, } } @@ -55,13 +52,11 @@ impl PeerManager { for (peer_addr, peer) in peers.iter_mut() { let mut new_peer = Peer::new(peer_addr, self.network_magic); - let peer_rx_guard = self.receiver.lock().await; - let mut input = InputPort::>::default(); - input.connect((*peer_rx_guard).clone()); - new_peer.input = Arc::new(RwLock::new(input)); + let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); - drop(peer_rx_guard); + input.connect(receiver); + new_peer.input = Arc::new(RwLock::new(input)); new_peer.is_peer_sharing_enabled = new_peer .query_peer_sharing_mode() @@ -76,25 +71,9 @@ impl PeerManager { *peer = Some(new_peer); } - self.start_recv_task().await; Ok(()) } - async fn start_recv_task(&self) { - let receiver = Arc::clone(&self.receiver); - let timeout_duration = Duration::from_millis(500); - - tokio::spawn(async move { - loop { - let _ = timeout(timeout_duration, async { - let mut rx_guard = receiver.lock().await; - rx_guard.recv().await - }) - .await; - } - }); - } - pub async fn pick_peer_rand( &self, peers_per_request: u8, @@ -147,13 +126,11 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); - let peer_rx_guard = self.receiver.lock().await; - let mut input = InputPort::>::default(); - input.connect((*peer_rx_guard).clone()); - new_peer.input = Arc::new(RwLock::new(input)); + let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); - drop(peer_rx_guard); + input.connect(receiver); + new_peer.input = Arc::new(RwLock::new(input)); let timeout_duration = Duration::from_secs(5); diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index 176d89a..1d64e90 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -50,7 +50,7 @@ impl gasket::framework::Worker for Worker { &mut self, stage: &mut Stage, ) -> Result>, WorkerError> { - let queued_len = stage.output.len(); + let queued_len = stage.output.len().unwrap_or(0); let current_cap = CAP - queued_len as u16; if current_cap == 0 { diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 51ffc39..93061d9 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use gasket::messaging::tokio::ChannelSendAdapter; use crate::{ ledger::{ @@ -31,10 +32,15 @@ pub async fn run( let relay_adapter: Arc = Arc::new(MockRelayDataAdapter::new()); - let (sender, receiver) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); + let (sender, _) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); + + let broadcast_sender = match &sender { + ChannelSendAdapter::Broadcast(sender) => sender.clone(), + _ => panic!("Expected broadcast sender"), + }; let peer_addrs = config.peer_manager.peers.clone(); - let peer_manager = PeerManager::new(2, peer_addrs, receiver); + let peer_manager = PeerManager::new(2, peer_addrs, broadcast_sender); peer_manager.init().await?; From 5252394271f0d865aeec34d45517f98a9f3e1c8b Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Thu, 20 Mar 2025 23:17:35 +0800 Subject: [PATCH 5/5] refactor: update PeerManager to use receiver instead of sender for input connection --- src/network/peer_manager.rs | 18 +++++++----------- src/pipeline/mod.rs | 10 ++-------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index fdd7ec1..6bdf63f 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -2,13 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort, Message}; +use gasket::messaging::{tokio::ChannelRecvAdapter, InputPort}; use pallas::network::miniprotocols::peersharing::PeerAddress; use rand::seq::{IndexedMutRandom, IndexedRandom}; use serde::Deserialize; use thiserror::Error; use tokio::sync::RwLock; -use tokio::{sync::broadcast::Sender, time::timeout}; +use tokio::time::timeout; use tracing::{error, info, warn}; use super::peer::{Peer, PeerError}; @@ -25,14 +25,14 @@ pub enum PeerManagerError { pub struct PeerManager { network_magic: u64, peers: RwLock>>, - sender: Sender>>, + receiver: ChannelRecvAdapter>, } impl PeerManager { pub fn new( network_magic: u64, peer_addresses: Vec, - sender: Sender>>, + receiver: ChannelRecvAdapter>, ) -> Self { let peers = peer_addresses .into_iter() @@ -42,7 +42,7 @@ impl PeerManager { Self { network_magic, peers: RwLock::new(peers), - sender, + receiver, } } @@ -53,9 +53,7 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); let mut input = InputPort::>::default(); - let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); - - input.connect(receiver); + input.connect(self.receiver.clone()); new_peer.input = Arc::new(RwLock::new(input)); new_peer.is_peer_sharing_enabled = new_peer @@ -127,9 +125,7 @@ impl PeerManager { let mut new_peer = Peer::new(peer_addr, self.network_magic); let mut input = InputPort::>::default(); - let receiver = ChannelRecvAdapter::Broadcast(self.sender.subscribe()); - - input.connect(receiver); + input.connect(self.receiver.clone()); new_peer.input = Arc::new(RwLock::new(input)); let timeout_duration = Duration::from_secs(5); diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 93061d9..51ffc39 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::Result; -use gasket::messaging::tokio::ChannelSendAdapter; use crate::{ ledger::{ @@ -32,15 +31,10 @@ pub async fn run( let relay_adapter: Arc = Arc::new(MockRelayDataAdapter::new()); - let (sender, _) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); - - let broadcast_sender = match &sender { - ChannelSendAdapter::Broadcast(sender) => sender.clone(), - _ => panic!("Expected broadcast sender"), - }; + let (sender, receiver) = gasket::messaging::tokio::broadcast_channel::>(CAP as usize); let peer_addrs = config.peer_manager.peers.clone(); - let peer_manager = PeerManager::new(2, peer_addrs, broadcast_sender); + let peer_manager = PeerManager::new(2, peer_addrs, receiver); peer_manager.init().await?;