diff --git a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala index 759ab21c11..43c111f926 100644 --- a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala +++ b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala @@ -140,13 +140,13 @@ class DumpChainActor( val account = n.value.toArray[Byte].toAccount if (account.codeHash != DumpChainActor.emptyEvm) { - peers.headOption.foreach { case Peer(_, _, _) => + peers.headOption.foreach { _ => evmTorequest = evmTorequest :+ account.codeHash evmCodeHashes = evmCodeHashes + account.codeHash } } if (account.storageRoot != DumpChainActor.emptyStorage) { - peers.headOption.foreach { case Peer(_, _, _) => + peers.headOption.foreach { _ => contractChildren = contractChildren :+ account.storageRoot contractNodesHashes = contractNodesHashes + account.storageRoot } diff --git a/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala new file mode 100644 index 0000000000..b334b64cea --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala @@ -0,0 +1,74 @@ +package io.iohk.ethereum.network + +import java.net.InetSocketAddress + +import akka.actor.ActorRef +import akka.util.ByteString + +case class ConnectedPeers( + private val incomingPendingPeers: Map[PeerId, Peer], + private val outgoingPendingPeers: Map[PeerId, Peer], + private val handshakedPeers: Map[PeerId, Peer] +) { + + // FIXME: Kept only for compatibility purposes, should eventually be removed + lazy val peers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers + + private lazy val allPeers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers ++ incomingPendingPeers + + private lazy val allPeersRemoteAddresses: Set[InetSocketAddress] = allPeers.values.map(_.remoteAddress).toSet + def isConnectionHandled(remoteAddress: InetSocketAddress): Boolean = + allPeersRemoteAddresses.contains(remoteAddress) + + /* + We have the node id of our outgoing pending peers so we could use that in our checks, by rejecting a peer that + handshaked to us with the same node id. + However, with checking the node id of only handshaked peers we prioritize handshaked peers over pending ones, + in the above mentioned case the repeated pending peer connection will eventually die out + */ + private lazy val handshakedPeersNodeIds: Set[ByteString] = handshakedPeers.values.flatMap(_.nodeId).toSet + def hasHandshakedWith(nodeId: ByteString): Boolean = + handshakedPeersNodeIds.contains(nodeId) + + lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size + lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection } + lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection } + + lazy val handshakedPeersCount: Int = handshakedPeers.size + lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeers.size + + def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId) + + def addNewPendingPeer(pendingPeer: Peer): ConnectedPeers = { + if (pendingPeer.incomingConnection) + copy(incomingPendingPeers = incomingPendingPeers + (pendingPeer.id -> pendingPeer)) + else + copy(outgoingPendingPeers = outgoingPendingPeers + (pendingPeer.id -> pendingPeer)) + } + + def promotePeerToHandshaked(peerAfterHandshake: Peer): ConnectedPeers = { + if (peerAfterHandshake.incomingConnection) + copy( + incomingPendingPeers = incomingPendingPeers - peerAfterHandshake.id, + handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake) + ) + else + copy( + outgoingPendingPeers = outgoingPendingPeers - peerAfterHandshake.id, + handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake) + ) + } + + def removeTerminatedPeer(peerRef: ActorRef): (Iterable[PeerId], ConnectedPeers) = { + val peersId = allPeers.collect { case (id, peer) if peer.ref == peerRef => id } + + ( + peersId, + ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId) + ) + } +} + +object ConnectedPeers { + def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty) +} diff --git a/src/main/scala/io/iohk/ethereum/network/peer.scala b/src/main/scala/io/iohk/ethereum/network/Peer.scala similarity index 50% rename from src/main/scala/io/iohk/ethereum/network/peer.scala rename to src/main/scala/io/iohk/ethereum/network/Peer.scala index 64b1777786..7c290001b4 100644 --- a/src/main/scala/io/iohk/ethereum/network/peer.scala +++ b/src/main/scala/io/iohk/ethereum/network/Peer.scala @@ -3,11 +3,21 @@ package io.iohk.ethereum.network import java.net.InetSocketAddress import akka.actor.ActorRef +import akka.util.ByteString import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId case class PeerId(value: String) extends BlackListId -case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) { +object PeerId { + def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name) +} + +case class Peer( + remoteAddress: InetSocketAddress, + ref: ActorRef, + incomingConnection: Boolean, + nodeId: Option[ByteString] = None +) { // FIXME PeerId should be actual peerId i.e id derived form node public key - def id: PeerId = PeerId(ref.path.name) + def id: PeerId = PeerId.fromRef(ref) } diff --git a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala index 357c94b91a..09a10c6804 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala @@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult]( def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler - val peerId: PeerId = PeerId(self.path.name) - - val peer: Peer = Peer(peerAddress, self, incomingConnection) + val peerId: PeerId = PeerId.fromRef(self) override def receive: Receive = waitingForInitialCommand @@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult]( case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) => val newUri = rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri)) - processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries) + processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries) case RLPxConnectionHandler.ConnectionFailed => log.debug("Failed to establish RLPx connection") @@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult]( def processingHandshaking( handshaker: Handshaker[R], + remoteNodeId: ByteString, rlpxConnection: RLPxConnection, timeout: Cancellable, numRetries: Int @@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult]( // handles the received message handshaker.applyMessage(msg).foreach { newHandshaker => timeout.cancel() - processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries) + processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries) } handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend)) case ResponseTimeout => timeout.cancel() val newHandshaker = handshaker.processTimeout - processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries) + processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries) case GetStatus => sender() ! StatusResponse(Handshaking(numRetries)) @@ -145,6 +144,7 @@ class PeerActor[R <: HandshakeResult]( */ private def processHandshakerNextMessage( handshaker: Handshaker[R], + remoteNodeId: ByteString, rlpxConnection: RLPxConnection, numRetries: Int ): Unit = @@ -152,11 +152,11 @@ class PeerActor[R <: HandshakeResult]( case Right(NextMessage(msgToSend, timeoutTime)) => rlpxConnection.sendMessage(msgToSend) val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout) - context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries) + context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries) case Left(HandshakeSuccess(handshakeResult)) => rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) } - context become new HandshakedPeer(rlpxConnection, handshakeResult).receive + context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive unstashAll() case Left(HandshakeFailure(reason)) => @@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult]( stash() } - class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) { + class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) { + val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId)) peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult)) /** diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala index 0b2a585917..7e1ed32eee 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala @@ -4,7 +4,7 @@ import java.net.{InetSocketAddress, URI} import akka.actor.SupervisorStrategy.Stop import akka.actor._ -import akka.util.Timeout +import akka.util.{ByteString, Timeout} import io.iohk.ethereum.blockchain.sync.BlacklistSupport import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.network.PeerActor.PeerClosedConnection @@ -18,6 +18,7 @@ import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable} import io.iohk.ethereum.network.rlpx.AuthHandshaker import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration +import org.bouncycastle.util.encoders.Hex import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -59,12 +60,10 @@ class PeerManagerActor( Stop } - override def receive: Receive = start(Map.empty, Map.empty) - - def start(pendingPeers: PeerMap, peers: PeerMap): Receive = { case StartConnecting => + override def receive: Receive = { case StartConnecting => scheduleNodesUpdate() knownNodesManager ! KnownNodesManager.GetKnownNodes - context become listen(pendingPeers, peers) + context become listening(ConnectedPeers.empty) unstashAll() } @@ -77,16 +76,16 @@ class PeerManagerActor( ) } - def listen(pendingPeers: PeerMap, peers: PeerMap): Receive = { - handleCommonMessages(pendingPeers, peers) orElse + def listening(connectedPeers: ConnectedPeers): Receive = { + handleCommonMessages(connectedPeers) orElse handleBlacklistMessages orElse - connections(pendingPeers, peers) orElse - nodes(pendingPeers, peers) orElse { case _ => + connections(connectedPeers) orElse + handleNewNodesToConnectMessages(connectedPeers) orElse { case _ => stash() } } - def nodes(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = { case KnownNodesManager.KnownNodes(nodes) => val nodesToConnect = nodes.take(peerConfiguration.maxOutgoingPeers) @@ -98,23 +97,25 @@ class PeerManagerActor( } case PeerDiscoveryManager.DiscoveredNodesInfo(nodesInfo) => - val peerAddresses = outgoingPeersAddresses(peers) - val nodesToConnect = nodesInfo .filterNot { discoveryNodeInfo => val socketAddress = discoveryNodeInfo.node.tcpSocketAddress - peerAddresses.contains(socketAddress) || isBlacklisted(PeerAddress(socketAddress.getHostString)) + val alreadyConnected = connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith( + discoveryNodeInfo.node.id + ) + alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString)) } // not already connected to or blacklisted - .take(peerConfiguration.maxOutgoingPeers - peerAddresses.size) + .take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount) NetworkMetrics.DiscoveredPeersSize.set(nodesInfo.size) NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size) - NetworkMetrics.PendingPeersSize.set(pendingPeers.size) + NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount) log.info( s"Discovered ${nodesInfo.size} nodes, " + s"Blacklisted ${blacklistedPeers.size} nodes, " + - s"connected to ${peers.size}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}. " + + s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " + + s"pending connection attempts ${connectedPeers.pendingPeersCount}. " + s"Trying to connect to ${nodesToConnect.size} more nodes." ) @@ -126,7 +127,7 @@ class PeerManagerActor( } } - def connections(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def connections(connectedPeers: ConnectedPeers): Receive = { case PeerClosedConnection(peerAddress, reason) => blacklist( PeerAddress(peerAddress), @@ -135,10 +136,10 @@ class PeerManagerActor( ) case HandlePeerConnection(connection, remoteAddress) => - handleConnection(connection, remoteAddress, pendingPeers, peers) + handleConnection(connection, remoteAddress, connectedPeers) case ConnectToPeer(uri) => - connectWith(uri, pendingPeers, peers) + connectWith(uri, connectedPeers) } def getBlacklistDuration(reason: Long): FiniteDuration = { @@ -152,17 +153,16 @@ class PeerManagerActor( private def handleConnection( connection: ActorRef, remoteAddress: InetSocketAddress, - pendingPeers: PeerMap, - peers: PeerMap + connectedPeers: ConnectedPeers ): Unit = { - val connectionHandled = isConnectionHandled(remoteAddress, pendingPeers, peers) - val isPendingPeersNotMaxValue = pendingPeers.size < peerConfiguration.maxPendingPeers + val alreadyConnectedToPeer = connectedPeers.isConnectionHandled(remoteAddress) + val isPendingPeersNotMaxValue = connectedPeers.incomingPendingPeersCount < peerConfiguration.maxPendingPeers val validConnection = for { validHandler <- validateConnection( remoteAddress, IncomingConnectionAlreadyHandled(remoteAddress, connection), - connectionHandled + !alreadyConnectedToPeer ) validNumber <- validateConnection( validHandler, @@ -173,90 +173,82 @@ class PeerManagerActor( validConnection match { case Right(address) => - val (peer, newPendingPeers, newPeers) = createPeer(address, incomingConnection = true, pendingPeers, peers) + val (peer, newConnectedPeers) = createPeer(address, incomingConnection = true, connectedPeers) peer.ref ! PeerActor.HandleConnection(connection, remoteAddress) - context become listen(newPendingPeers, newPeers) + context become listening(newConnectedPeers) case Left(error) => handleConnectionErrors(error) } } - private def connectWith(uri: URI, pendingPeers: PeerMap, peers: PeerMap): Unit = { + private def connectWith(uri: URI, connectedPeers: ConnectedPeers): Unit = { + val nodeId = ByteString(Hex.decode(uri.getUserInfo)) val remoteAddress = new InetSocketAddress(uri.getHost, uri.getPort) - val connectionHandled = isConnectionHandled(remoteAddress, pendingPeers, peers) - val isOutgoingPeersNotMaxValue = countOutgoingPeers(peers) < peerConfiguration.maxOutgoingPeers + val alreadyConnectedToPeer = + connectedPeers.hasHandshakedWith(nodeId) || connectedPeers.isConnectionHandled(remoteAddress) + val isOutgoingPeersNotMaxValue = connectedPeers.outgoingPeersCount < peerConfiguration.maxOutgoingPeers val validConnection = for { - validHandler <- validateConnection(remoteAddress, OutgoingConnectionAlreadyHandled(uri), connectionHandled) + validHandler <- validateConnection(remoteAddress, OutgoingConnectionAlreadyHandled(uri), !alreadyConnectedToPeer) validNumber <- validateConnection(validHandler, MaxOutgoingConnections, isOutgoingPeersNotMaxValue) } yield validNumber validConnection match { case Right(address) => - val (peer, newPendingPeers, newPeers) = createPeer(address, incomingConnection = false, pendingPeers, peers) + val (peer, newConnectedPeers) = createPeer(address, incomingConnection = false, connectedPeers) peer.ref ! PeerActor.ConnectTo(uri) - context become listen(newPendingPeers, newPeers) + context become listening(newConnectedPeers) case Left(error) => handleConnectionErrors(error) } } - private def isConnectionHandled(remoteAddress: InetSocketAddress, pendingPeers: PeerMap, peers: PeerMap): Boolean = - !(peers ++ pendingPeers).values.map(_.remoteAddress).toSet.contains(remoteAddress) - - private def outgoingPeersAddresses(peers: PeerMap): Set[InetSocketAddress] = - peers.filter { case (_, p) => !p.incomingConnection }.values.map(_.remoteAddress).toSet - - private def countOutgoingPeers(peers: PeerMap): Int = peers.count { case (_, p) => !p.incomingConnection } - - private def countIncomingPeers(peers: PeerMap): Int = peers.count { case (_, p) => p.incomingConnection } - - def handleCommonMessages(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = { case GetPeers => - getPeers(peers.values.toSet).pipeTo(sender()) + getPeers(connectedPeers.peers.values.toSet).pipeTo(sender()) - case SendMessage(message, peerId) if peers.contains(peerId) => - peers(peerId).ref ! PeerActor.SendMessage(message) + case SendMessage(message, peerId) if connectedPeers.getPeer(peerId).isDefined => + connectedPeers.getPeer(peerId).get.ref ! PeerActor.SendMessage(message) case Terminated(ref) => - val terminatedPeers = terminatedPeersIds(ref, pendingPeers, peers) - - terminatedPeers.foreach { peerId => + val (terminatedPeersIds, newConnectedPeers) = connectedPeers.removeTerminatedPeer(ref) + terminatedPeersIds.foreach { peerId => peerEventBus ! Publish(PeerEvent.PeerDisconnected(peerId)) } - context unwatch ref - context become listen(pendingPeers -- terminatedPeers, peers -- terminatedPeers) - case PeerEvent.PeerHandshakeSuccessful(peer, _) if peer.incomingConnection => - if (countIncomingPeers(peers) >= peerConfiguration.maxIncomingPeers) { - peer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers) + context unwatch ref + context become listening(newConnectedPeers) + + case PeerEvent.PeerHandshakeSuccessful(handshakedPeer, _) => + if ( + handshakedPeer.incomingConnection && connectedPeers.incomingHandshakedPeersCount >= peerConfiguration.maxIncomingPeers + ) { + handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers) + } else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) { + // FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish + // it into PendingPeer/HandshakedPeer classes + + // Even though we do already validations for this, we might have missed it someone tried connecting to us at the + // same time as we do + log.debug(s"Disconnecting from ${handshakedPeer.remoteAddress} as we are already connected to him") + handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected) } else { - context become listen(pendingPeers - peer.id, peers + (peer.id -> peer)) + context become listening(connectedPeers.promotePeerToHandshaked(handshakedPeer)) } } - private def terminatedPeersIds(ref: ActorRef, pendingPeers: PeerMap, peers: PeerMap): Iterable[PeerId] = { - (peers ++ pendingPeers).collect { case (id, Peer(_, peerRef, _)) if peerRef == ref => id } - } - private def createPeer( address: InetSocketAddress, incomingConnection: Boolean, - pendingPeers: PeerMap, - peers: PeerMap - ): (Peer, PeerMap, PeerMap) = { + connectedPeers: ConnectedPeers + ): (Peer, ConnectedPeers) = { val ref = peerFactory(context, address, incomingConnection) context watch ref - val peer = Peer(address, ref, incomingConnection) - val newPeer = peer.id -> peer + val pendingPeer = Peer(address, ref, incomingConnection, None) - val (newPendingPeers, newPeers) = if (peer.incomingConnection) { - (pendingPeers + newPeer, peers) - } else { - (pendingPeers, peers + newPeer) - } + val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer) - (peer, newPendingPeers, newPeers) + (pendingPeer, newConnectedPeers) } private def getPeers(peers: Set[Peer]): Future[Peers] = { diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala index 0512bd21d9..58bd74577c 100644 --- a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala +++ b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala @@ -30,14 +30,19 @@ class PeerDiscoveryManager( var pingedNodes: Map[ByteString, PingInfo] = Map.empty - var nodesInfo: Map[ByteString, DiscoveryNodeInfo] = { + val startingNodes: Map[ByteString, DiscoveryNodeInfo] = { val knownNodesURIs = if (discoveryConfig.discoveryEnabled) knownNodesStorage.getKnownNodes() else Set.empty - val nodesInfo = knownNodesURIs.map(uri => DiscoveryNodeInfo.fromUri(uri)) ++ bootStrapNodesInfo - nodesInfo.map { nodeInfo => nodeInfo.node.id -> nodeInfo }.toMap + val startingNodesInfo = knownNodesURIs.map(uri => DiscoveryNodeInfo.fromUri(uri)) ++ bootStrapNodesInfo + val startingNodesInfoWithoutSelf = startingNodesInfo.filterNot { + _.node.id == ByteString(nodeStatusHolder.get().nodeId) + } + startingNodesInfoWithoutSelf.map { nodeInfo => nodeInfo.node.id -> nodeInfo }.toMap } + var nodesInfo: Map[ByteString, DiscoveryNodeInfo] = startingNodes + if (discoveryConfig.discoveryEnabled) { discoveryListener ! DiscoveryListener.Subscribe context.system.scheduler.scheduleWithFixedDelay( diff --git a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala index be7cd00a75..35ef9b88c4 100644 --- a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala +++ b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala @@ -80,7 +80,7 @@ class DebugServiceSpec bestBlockHash = peerStatus.bestHash ) val peer1Probe = TestProbe() - val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, incomingConnection = false) + val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false) val peer1Info: PeerInfo = initialPeerInfo.withForkAccepted(false) } } diff --git a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala index f04a0964fc..d2b2953624 100644 --- a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala @@ -254,17 +254,19 @@ class EtcPeerManagerSpec extends AnyFlatSpec with Matchers { bestBlockHash = peerStatus.bestHash ) + val fakeNodeId = ByteString() + val peer1Probe = TestProbe() - val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false) + val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, Some(fakeNodeId)) val peer1Info = initialPeerInfo.withForkAccepted(false) val peer2Probe = TestProbe() - val peer2 = Peer(new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false) + val peer2 = Peer(new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, Some(fakeNodeId)) val peer2Info = initialPeerInfo.withForkAccepted(false) val peer3Probe = TestProbe() - val peer3 = Peer(new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false) + val peer3 = Peer(new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, Some(fakeNodeId)) val freshPeerProbe = TestProbe() - val freshPeer = Peer(new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false) + val freshPeer = Peer(new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, Some(fakeNodeId)) val freshPeerInfo = initialPeerInfo.withForkAccepted(false) val peerManager = TestProbe() diff --git a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala index 5946b9602f..1d87db9854 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala @@ -4,6 +4,7 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.TestProbe +import akka.util.ByteString import io.iohk.ethereum.Fixtures import io.iohk.ethereum.domain.ChainWeight import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{MessageFromPeer, PeerDisconnected, PeerHandshakeSuccessful} @@ -97,7 +98,7 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers { peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe1.ref) peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe2.ref) - val peerHandshaked = new Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false) + val peerHandshaked = new Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false, Some(ByteString())) val msgPeerHandshaked = PeerHandshakeSuccessful(peerHandshaked, initialPeerInfo) peerEventBusActor ! PeerEventBusActor.Publish(msgPeerHandshaked) diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index d8ed34da64..0b233a7b80 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -3,11 +3,12 @@ package io.iohk.ethereum.network import java.net.{InetSocketAddress, URI} import akka.actor._ -import akka.testkit.{TestActorRef, TestProbe} +import akka.testkit.{TestActorRef, TestKit, TestProbe} +import akka.util.ByteString import com.miguno.akka.testing.VirtualTime import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader, ChainWeight} import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo -import io.iohk.ethereum.network.PeerActor.PeerClosedConnection +import io.iohk.ethereum.network.PeerActor.{ConnectTo, PeerClosedConnection} import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerHandshaked import io.iohk.ethereum.network.PeerEventBusActor.{PeerEvent, Publish, Subscribe} @@ -18,30 +19,30 @@ import io.iohk.ethereum.network.p2p.messages.Versions import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect import io.iohk.ethereum.utils.Config import io.iohk.ethereum.{Fixtures, NormalPatience} +import org.bouncycastle.util.encoders.Hex import org.scalatest.concurrent.Eventually -import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers // scalastyle:off magic.number -class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with NormalPatience { +class PeerManagerSpec extends TestKit(ActorSystem("PeerManagerSpec_System")) with AnyFlatSpecLike with Matchers with Eventually with NormalPatience { "PeerManager" should "try to connect to bootstrap and known nodes on startup" in new TestSetup { - startConnecting() - - system.terminate() + start() + handleInitialNodesDiscovery() } it should "blacklist peer that fail to establish tcp connection" in new TestSetup { + start() + handleInitialNodesDiscovery() - startConnecting() - - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) peerManager ! PeerManagerActor.HandlePeerConnection(incomingConnection1.ref, incomingPeerAddress1) - val probe2: TestProbe = createdPeers(2) + val probe2: TestProbe = createdPeers(2).probe val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true) peerManager ! PeerClosedConnection(peer.remoteAddress.getHostString, Disconnect.Reasons.Other) @@ -49,14 +50,13 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor eventually { peerManager.underlyingActor.blacklistedPeers.size shouldEqual 1 } - - system.terminate() } it should "retry connections to remaining bootstrap nodes" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) @@ -68,24 +68,24 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerDiscoveryManager.expectMsg(PeerDiscoveryManager.GetDiscoveredNodesInfo) peerDiscoveryManager.reply(PeerDiscoveryManager.DiscoveredNodesInfo(bootstrapNodes)) - system.terminate() } it should "publish disconnect messages from peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.ref ! PoisonPill time.advance(21000) // connect to 2 bootstrap peers peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe.ref.path.name)))) - system.terminate() } it should "not handle the connection from a peer that's already connected" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() val connection = TestProbe() @@ -95,14 +95,14 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! PeerManagerActor.HandlePeerConnection(connection.ref, new InetSocketAddress("127.0.0.1", 30340)) watcher.expectMsgClass(classOf[Terminated]) - system.terminate() } it should "handle pending and handshaked incoming peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - createdPeers.head.expectMsgClass(classOf[PeerActor.ConnectTo]) - createdPeers(1).expectMsgClass(classOf[PeerActor.ConnectTo]) + createdPeers.head.probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) time.advance(21000) // wait for next scan @@ -111,8 +111,8 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! PeerManagerActor.HandlePeerConnection(incomingConnection1.ref, incomingPeerAddress1) - val probe2: TestProbe = createdPeers(2) - val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true) + val probe2: TestProbe = createdPeers(2).probe + val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true, Some(incomingNodeId1)) probe2.expectMsg(PeerActor.HandleConnection(incomingConnection1.ref, incomingPeerAddress1)) probe2.reply(PeerEvent.PeerHandshakeSuccessful(peer, initialPeerInfo)) @@ -125,9 +125,9 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor watcher.expectMsgClass(classOf[Terminated]) - val probe3: TestProbe = createdPeers(3) + val probe3: TestProbe = createdPeers(3).probe - val secondPeer = Peer(incomingPeerAddress2, probe3.ref, incomingConnection = true) + val secondPeer = Peer(incomingPeerAddress2, probe3.ref, incomingConnection = true, Some(incomingNodeId2)) probe3.expectMsg(PeerActor.HandleConnection(incomingConnection2.ref, incomingPeerAddress2)) probe3.reply(PeerEvent.PeerHandshakeSuccessful(secondPeer, initialPeerInfo)) @@ -137,24 +137,23 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor probe3.ref ! PoisonPill peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe3.ref.path.name)))) - system.terminate() } it should "handle common message about getting peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() val requestSender = TestProbe() requestSender.send(peerManager, GetPeers) requestSender.expectMsgClass(classOf[Peers]) - - system.terminate() } it should "handle common message about sending message to peer" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) @@ -164,16 +163,71 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! SendMessage(block, PeerId(probe.ref.path.name)) probe.expectMsg(PeerActor.SendMessage(block)) + } + + it should "disconnect from incoming peers already handshaked" in new TestSetup { + start() + handleInitialNodesDiscovery() + + // Finish handshake with the first of the bootstrap peers + val TestPeer(peerAsOutgoing, peerAsOutgoingProbe) = createdPeers.head + + val ConnectTo(uriConnectedTo) = peerAsOutgoingProbe.expectMsgClass(classOf[PeerActor.ConnectTo]) + val nodeId = ByteString(Hex.decode(uriConnectedTo.getUserInfo)) - system.terminate() + peerAsOutgoingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsOutgoing.copy(nodeId = Some(nodeId)), initialPeerInfo)) + + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + + // Repeated incoming connection from one of the bootstrap peers + val peerAsIncomingTcpConnection = incomingConnection1 + val peerAsIncomingAddress = incomingPeerAddress1 + + peerManager ! PeerManagerActor.HandlePeerConnection(peerAsIncomingTcpConnection.ref, peerAsIncomingAddress) + + val peerAsIncomingProbe = createdPeers.last.probe + val peerAsIncoming = Peer(peerAsIncomingAddress, peerAsIncomingProbe.ref, incomingConnection = true, Some(nodeId)) + + peerAsIncomingProbe.expectMsg(PeerActor.HandleConnection(peerAsIncomingTcpConnection.ref, peerAsIncoming.remoteAddress)) + peerAsIncomingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsIncoming, initialPeerInfo)) + + peerAsIncomingProbe.expectMsg(PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected)) } - trait TestSetup { - implicit lazy val system: ActorSystem = ActorSystem("PeerManagerActorSpec_System") + it should "disconnect from outgoing peer if, while it was pending, the same peer hanshaked as incoming" in new TestSetup { + start() + handleInitialNodesDiscovery() + + // Keep both bootstrap peers as pending + val TestPeer(peerAsOutgoing, peerAsOutgoingProbe) = createdPeers.head + + val ConnectTo(uriConnectedTo) = peerAsOutgoingProbe.expectMsgClass(classOf[PeerActor.ConnectTo]) + val nodeId = ByteString(Hex.decode(uriConnectedTo.getUserInfo)) + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + + // Receive incoming connection from one of the bootstrap peers + val peerAsIncomingTcpConnection = incomingConnection1 + val peerAsIncomingAddress = incomingPeerAddress1 + + peerManager ! PeerManagerActor.HandlePeerConnection(peerAsIncomingTcpConnection.ref, peerAsIncomingAddress) + + val peerAsIncomingProbe = createdPeers.last.probe + val peerAsIncoming = Peer(peerAsIncomingAddress, peerAsIncomingProbe.ref, incomingConnection = true, Some(nodeId)) + + peerAsIncomingProbe.expectMsg(PeerActor.HandleConnection(peerAsIncomingTcpConnection.ref, peerAsIncoming.remoteAddress)) + peerAsIncomingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsIncoming, initialPeerInfo)) + + // Handshake with peer as outgoing is finished + peerAsOutgoingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsOutgoing.copy(nodeId = Some(nodeId)), initialPeerInfo)) + peerAsOutgoingProbe.expectMsg(PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected)) + } + + trait TestSetup { val time = new VirtualTime - var createdPeers: Seq[TestProbe] = Seq.empty + case class TestPeer(peer: Peer, probe: TestProbe) + var createdPeers: Seq[TestPeer] = Seq.empty val peerConfiguration: PeerConfiguration = Config.Network.peer val discoveryConfig = DiscoveryConfig(Config.config, Config.blockchains.blockchainConfig.bootstrapNodes) @@ -187,18 +241,21 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor .map(PeerDiscoveryManager.DiscoveryNodeInfo.fromNode) val knownNodes: Set[URI] = Set.empty - val peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef = { (ctx, addr, _) => - val peer = TestProbe() - createdPeers :+= peer - peer.ref + val peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef = { (_, address, isIncoming) => + val peerProbe = TestProbe() + createdPeers :+= TestPeer(Peer(address, peerProbe.ref, isIncoming), peerProbe) + peerProbe.ref } - val incomingConnection1 = TestProbe() - val incomingConnection2 = TestProbe() - val incomingConnection3 = TestProbe() val port = 30340 + val incomingConnection1 = TestProbe() + val incomingNodeId1 = ByteString(1) val incomingPeerAddress1 = new InetSocketAddress("127.0.0.2", port) + val incomingConnection2 = TestProbe() + val incomingNodeId2 = ByteString(2) val incomingPeerAddress2 = new InetSocketAddress("127.0.0.3", port) + val incomingConnection3 = TestProbe() + val incomingNodeId3 = ByteString(3) val incomingPeerAddress3 = new InetSocketAddress("127.0.0.4", port) val peerStatus = Status( @@ -230,12 +287,15 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor ) )(system) - def startConnecting(): Unit = { + def start(): Unit = { + peerEventBus.expectMsg(Subscribe(PeerHandshaked)) + peerManager ! PeerManagerActor.StartConnecting + } + def handleInitialNodesDiscovery(): Unit = { time.advance(6000) // wait for bootstrap nodes scan - peerEventBus.expectMsg(Subscribe(PeerHandshaked)) peerDiscoveryManager.expectMsg(PeerDiscoveryManager.GetDiscoveredNodesInfo) peerDiscoveryManager.reply(PeerDiscoveryManager.DiscoveredNodesInfo(bootstrapNodes)) knownNodesManager.expectMsg(KnownNodesManager.GetKnownNodes)