diff --git a/src/network/peer_manager.rs b/src/network/peer_manager.rs index 7fdaadf..6bdf63f 100644 --- a/src/network/peer_manager.rs +++ b/src/network/peer_manager.rs @@ -48,6 +48,7 @@ impl PeerManager { pub async fn init(&self) -> Result<(), PeerManagerError> { let mut peers = self.peers.write().await; + for (peer_addr, peer) in peers.iter_mut() { let mut new_peer = Peer::new(peer_addr, self.network_magic); diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index bf96bad..1d64e90 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -50,9 +50,16 @@ impl gasket::framework::Worker for Worker { &mut self, stage: &mut Stage, ) -> Result>, WorkerError> { + let queued_len = stage.output.len().unwrap_or(0); + let current_cap = CAP - queued_len as u16; + + if current_cap == 0 { + return Ok(WorkSchedule::Idle); + } + let transactions = stage .priority - .next(TransactionStatus::Pending, CAP) + .next(TransactionStatus::Pending, current_cap) .await .or_retry()?;