diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 44cc2115a..9cfdb1e8c 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -66,10 +66,11 @@ class NetworkController(settings: NetworkSettings, log.info(s"Declared address: ${scorexContext.externalNodeAddress}") //bind to listen incoming connections - tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = false) + tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = true) - override def receive: Receive = - bindingLogic orElse + override def receive: Receive = bindingLogic + + def mainLogic(tcpListener: ActorRef): Receive = businessLogic orElse peerCommands orElse connectionEvents orElse @@ -80,6 +81,7 @@ class NetworkController(settings: NetworkSettings, case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) scheduleConnectionToPeer() + context become mainLogic(sender()) case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -128,10 +130,12 @@ class NetworkController(settings: NetworkSettings, log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId") if (connectionDirection.isOutgoing) createPeerConnectionHandler(connectionId, sender()) else peerManagerRef ! ConfirmConnection(connectionId, sender()) + tcpManager ! Tcp.ResumeAccepting(1) case Connected(remoteAddress, _) => log.warn(s"Connection to peer $remoteAddress is already established") sender() ! Close + tcpManager ! Tcp.ResumeAccepting(1) case ConnectionConfirmed(connectionId, handlerRef) => log.info(s"Connection confirmed to $connectionId") @@ -192,7 +196,7 @@ class NetworkController(settings: NetworkSettings, */ private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.schedule(5.seconds, 5.seconds) { - if (connections.size < settings.maxConnections) { + if (connections.size + unconfirmedConnections.size < settings.maxConnections) { val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo)) @@ -282,7 +286,9 @@ class NetworkController(settings: NetworkSettings, } else { peerManagerRef ! AddOrUpdatePeer(peerInfo) - val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(peerInfo)) + val updatedPeerSpec = peerInfo.peerSpec.copy(declaredAddress = Some(peerInfo.peerSpec.address.getOrElse(remoteAddress))) + val updatedPeerInfo = peerInfo.copy(peerSpec = updatedPeerSpec) + val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(updatedPeerInfo)) connections += remoteAddress -> updatedConnectedPeer context.system.eventStream.publish(HandshakedPeer(updatedConnectedPeer)) }