From ef84bec51420429a2f14c57eb29f6e8c1f9c9c1b Mon Sep 17 00:00:00 2001 From: Nicolas Tallar Date: Tue, 27 Oct 2020 12:19:17 -0300 Subject: [PATCH 1/2] [ETCM-178] Disallow repeated connections and connections to self --- .../txExecTest/util/DumpChainActor.scala | 4 +- .../ethereum/network/ConnectedPeers.scala | 72 ++++++++ .../network/{peer.scala => Peer.scala} | 14 +- .../io/iohk/ethereum/network/PeerActor.scala | 19 ++- .../ethereum/network/PeerManagerActor.scala | 132 +++++++-------- .../discovery/PeerDiscoveryManager.scala | 11 +- .../ethereum/jsonrpc/DebugServiceSpec.scala | 2 +- .../ethereum/network/EtcPeerManagerSpec.scala | 10 +- .../network/PeerEventBusActorSpec.scala | 3 +- .../ethereum/network/PeerManagerSpec.scala | 154 ++++++++++++------ 10 files changed, 282 insertions(+), 139 deletions(-) create mode 100644 src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala rename src/main/scala/io/iohk/ethereum/network/{peer.scala => Peer.scala} (50%) diff --git a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala index 759ab21c11..43c111f926 100644 --- a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala +++ b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainActor.scala @@ -140,13 +140,13 @@ class DumpChainActor( val account = n.value.toArray[Byte].toAccount if (account.codeHash != DumpChainActor.emptyEvm) { - peers.headOption.foreach { case Peer(_, _, _) => + peers.headOption.foreach { _ => evmTorequest = evmTorequest :+ account.codeHash evmCodeHashes = evmCodeHashes + account.codeHash } } if (account.storageRoot != DumpChainActor.emptyStorage) { - peers.headOption.foreach { case Peer(_, _, _) => + peers.headOption.foreach { _ => contractChildren = contractChildren :+ account.storageRoot contractNodesHashes = contractNodesHashes + account.storageRoot } diff --git a/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala new file mode 100644 index 0000000000..df4ef21f53 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala @@ -0,0 +1,72 @@ +package io.iohk.ethereum.network + +import java.net.InetSocketAddress + +import akka.actor.ActorRef +import akka.util.ByteString + +case class ConnectedPeers( + private val incomingPendingPeers: Map[PeerId, Peer], + private val outgoingPendingPeers: Map[PeerId, Peer], + private val handshakedPeers: Map[PeerId, Peer] +) { + + // FIXME: Kept only for compatibility purposes, should eventually be removed + lazy val peers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers + + private lazy val allPeers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers ++ incomingPendingPeers + + def isConnectionHandled(remoteAddress: InetSocketAddress): Boolean = + allPeers.values.map(_.remoteAddress).toSet.contains(remoteAddress) + + /* + We have the node id of our outgoing pending peers so we could use that in our checks, by rejecting a peer that + handshaked to us with the same node id. + However, with checking the node id of only handshaked peers we prioritize handshaked peers over pending ones, + in the above mentioned case the repeated pending peer connection will eventually die out + */ + def hasHandshakedWith(nodeId: ByteString): Boolean = + handshakedPeers.values.flatMap(_.nodeId).toSet.contains(nodeId) + + lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size + lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection } + lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection } + + lazy val handshakedPeersCount: Int = handshakedPeers.size + lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeers.size + + def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId) + + def addNewPendingPeer(pendingPeer: Peer): ConnectedPeers = { + if (pendingPeer.incomingConnection) + copy(incomingPendingPeers = incomingPendingPeers + (pendingPeer.id -> pendingPeer)) + else + copy(outgoingPendingPeers = outgoingPendingPeers + (pendingPeer.id -> pendingPeer)) + } + + def promotePeerToHandshaked(peerAfterHandshake: Peer): ConnectedPeers = { + if (peerAfterHandshake.incomingConnection) + copy( + incomingPendingPeers = incomingPendingPeers - peerAfterHandshake.id, + handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake) + ) + else + copy( + outgoingPendingPeers = outgoingPendingPeers - peerAfterHandshake.id, + handshakedPeers = handshakedPeers + (peerAfterHandshake.id -> peerAfterHandshake) + ) + } + + def removeTerminatedPeer(peerRef: ActorRef): (Iterable[PeerId], ConnectedPeers) = { + val peersId = allPeers.collect { case (id, peer) if peer.ref == peerRef => id } + + ( + peersId, + ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId) + ) + } +} + +object ConnectedPeers { + def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty) +} diff --git a/src/main/scala/io/iohk/ethereum/network/peer.scala b/src/main/scala/io/iohk/ethereum/network/Peer.scala similarity index 50% rename from src/main/scala/io/iohk/ethereum/network/peer.scala rename to src/main/scala/io/iohk/ethereum/network/Peer.scala index 64b1777786..7c290001b4 100644 --- a/src/main/scala/io/iohk/ethereum/network/peer.scala +++ b/src/main/scala/io/iohk/ethereum/network/Peer.scala @@ -3,11 +3,21 @@ package io.iohk.ethereum.network import java.net.InetSocketAddress import akka.actor.ActorRef +import akka.util.ByteString import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId case class PeerId(value: String) extends BlackListId -case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) { +object PeerId { + def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name) +} + +case class Peer( + remoteAddress: InetSocketAddress, + ref: ActorRef, + incomingConnection: Boolean, + nodeId: Option[ByteString] = None +) { // FIXME PeerId should be actual peerId i.e id derived form node public key - def id: PeerId = PeerId(ref.path.name) + def id: PeerId = PeerId.fromRef(ref) } diff --git a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala index 357c94b91a..09a10c6804 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala @@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult]( def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler - val peerId: PeerId = PeerId(self.path.name) - - val peer: Peer = Peer(peerAddress, self, incomingConnection) + val peerId: PeerId = PeerId.fromRef(self) override def receive: Receive = waitingForInitialCommand @@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult]( case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) => val newUri = rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri)) - processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries) + processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries) case RLPxConnectionHandler.ConnectionFailed => log.debug("Failed to establish RLPx connection") @@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult]( def processingHandshaking( handshaker: Handshaker[R], + remoteNodeId: ByteString, rlpxConnection: RLPxConnection, timeout: Cancellable, numRetries: Int @@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult]( // handles the received message handshaker.applyMessage(msg).foreach { newHandshaker => timeout.cancel() - processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries) + processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries) } handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend)) case ResponseTimeout => timeout.cancel() val newHandshaker = handshaker.processTimeout - processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries) + processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries) case GetStatus => sender() ! StatusResponse(Handshaking(numRetries)) @@ -145,6 +144,7 @@ class PeerActor[R <: HandshakeResult]( */ private def processHandshakerNextMessage( handshaker: Handshaker[R], + remoteNodeId: ByteString, rlpxConnection: RLPxConnection, numRetries: Int ): Unit = @@ -152,11 +152,11 @@ class PeerActor[R <: HandshakeResult]( case Right(NextMessage(msgToSend, timeoutTime)) => rlpxConnection.sendMessage(msgToSend) val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout) - context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries) + context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries) case Left(HandshakeSuccess(handshakeResult)) => rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) } - context become new HandshakedPeer(rlpxConnection, handshakeResult).receive + context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive unstashAll() case Left(HandshakeFailure(reason)) => @@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult]( stash() } - class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) { + class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) { + val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId)) peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult)) /** diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala index 0b2a585917..7e1ed32eee 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala @@ -4,7 +4,7 @@ import java.net.{InetSocketAddress, URI} import akka.actor.SupervisorStrategy.Stop import akka.actor._ -import akka.util.Timeout +import akka.util.{ByteString, Timeout} import io.iohk.ethereum.blockchain.sync.BlacklistSupport import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.network.PeerActor.PeerClosedConnection @@ -18,6 +18,7 @@ import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable} import io.iohk.ethereum.network.rlpx.AuthHandshaker import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration +import org.bouncycastle.util.encoders.Hex import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -59,12 +60,10 @@ class PeerManagerActor( Stop } - override def receive: Receive = start(Map.empty, Map.empty) - - def start(pendingPeers: PeerMap, peers: PeerMap): Receive = { case StartConnecting => + override def receive: Receive = { case StartConnecting => scheduleNodesUpdate() knownNodesManager ! KnownNodesManager.GetKnownNodes - context become listen(pendingPeers, peers) + context become listening(ConnectedPeers.empty) unstashAll() } @@ -77,16 +76,16 @@ class PeerManagerActor( ) } - def listen(pendingPeers: PeerMap, peers: PeerMap): Receive = { - handleCommonMessages(pendingPeers, peers) orElse + def listening(connectedPeers: ConnectedPeers): Receive = { + handleCommonMessages(connectedPeers) orElse handleBlacklistMessages orElse - connections(pendingPeers, peers) orElse - nodes(pendingPeers, peers) orElse { case _ => + connections(connectedPeers) orElse + handleNewNodesToConnectMessages(connectedPeers) orElse { case _ => stash() } } - def nodes(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = { case KnownNodesManager.KnownNodes(nodes) => val nodesToConnect = nodes.take(peerConfiguration.maxOutgoingPeers) @@ -98,23 +97,25 @@ class PeerManagerActor( } case PeerDiscoveryManager.DiscoveredNodesInfo(nodesInfo) => - val peerAddresses = outgoingPeersAddresses(peers) - val nodesToConnect = nodesInfo .filterNot { discoveryNodeInfo => val socketAddress = discoveryNodeInfo.node.tcpSocketAddress - peerAddresses.contains(socketAddress) || isBlacklisted(PeerAddress(socketAddress.getHostString)) + val alreadyConnected = connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith( + discoveryNodeInfo.node.id + ) + alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString)) } // not already connected to or blacklisted - .take(peerConfiguration.maxOutgoingPeers - peerAddresses.size) + .take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount) NetworkMetrics.DiscoveredPeersSize.set(nodesInfo.size) NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size) - NetworkMetrics.PendingPeersSize.set(pendingPeers.size) + NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount) log.info( s"Discovered ${nodesInfo.size} nodes, " + s"Blacklisted ${blacklistedPeers.size} nodes, " + - s"connected to ${peers.size}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}. " + + s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " + + s"pending connection attempts ${connectedPeers.pendingPeersCount}. " + s"Trying to connect to ${nodesToConnect.size} more nodes." ) @@ -126,7 +127,7 @@ class PeerManagerActor( } } - def connections(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def connections(connectedPeers: ConnectedPeers): Receive = { case PeerClosedConnection(peerAddress, reason) => blacklist( PeerAddress(peerAddress), @@ -135,10 +136,10 @@ class PeerManagerActor( ) case HandlePeerConnection(connection, remoteAddress) => - handleConnection(connection, remoteAddress, pendingPeers, peers) + handleConnection(connection, remoteAddress, connectedPeers) case ConnectToPeer(uri) => - connectWith(uri, pendingPeers, peers) + connectWith(uri, connectedPeers) } def getBlacklistDuration(reason: Long): FiniteDuration = { @@ -152,17 +153,16 @@ class PeerManagerActor( private def handleConnection( connection: ActorRef, remoteAddress: InetSocketAddress, - pendingPeers: PeerMap, - peers: PeerMap + connectedPeers: ConnectedPeers ): Unit = { - val connectionHandled = isConnectionHandled(remoteAddress, pendingPeers, peers) - val isPendingPeersNotMaxValue = pendingPeers.size < peerConfiguration.maxPendingPeers + val alreadyConnectedToPeer = connectedPeers.isConnectionHandled(remoteAddress) + val isPendingPeersNotMaxValue = connectedPeers.incomingPendingPeersCount < peerConfiguration.maxPendingPeers val validConnection = for { validHandler <- validateConnection( remoteAddress, IncomingConnectionAlreadyHandled(remoteAddress, connection), - connectionHandled + !alreadyConnectedToPeer ) validNumber <- validateConnection( validHandler, @@ -173,90 +173,82 @@ class PeerManagerActor( validConnection match { case Right(address) => - val (peer, newPendingPeers, newPeers) = createPeer(address, incomingConnection = true, pendingPeers, peers) + val (peer, newConnectedPeers) = createPeer(address, incomingConnection = true, connectedPeers) peer.ref ! PeerActor.HandleConnection(connection, remoteAddress) - context become listen(newPendingPeers, newPeers) + context become listening(newConnectedPeers) case Left(error) => handleConnectionErrors(error) } } - private def connectWith(uri: URI, pendingPeers: PeerMap, peers: PeerMap): Unit = { + private def connectWith(uri: URI, connectedPeers: ConnectedPeers): Unit = { + val nodeId = ByteString(Hex.decode(uri.getUserInfo)) val remoteAddress = new InetSocketAddress(uri.getHost, uri.getPort) - val connectionHandled = isConnectionHandled(remoteAddress, pendingPeers, peers) - val isOutgoingPeersNotMaxValue = countOutgoingPeers(peers) < peerConfiguration.maxOutgoingPeers + val alreadyConnectedToPeer = + connectedPeers.hasHandshakedWith(nodeId) || connectedPeers.isConnectionHandled(remoteAddress) + val isOutgoingPeersNotMaxValue = connectedPeers.outgoingPeersCount < peerConfiguration.maxOutgoingPeers val validConnection = for { - validHandler <- validateConnection(remoteAddress, OutgoingConnectionAlreadyHandled(uri), connectionHandled) + validHandler <- validateConnection(remoteAddress, OutgoingConnectionAlreadyHandled(uri), !alreadyConnectedToPeer) validNumber <- validateConnection(validHandler, MaxOutgoingConnections, isOutgoingPeersNotMaxValue) } yield validNumber validConnection match { case Right(address) => - val (peer, newPendingPeers, newPeers) = createPeer(address, incomingConnection = false, pendingPeers, peers) + val (peer, newConnectedPeers) = createPeer(address, incomingConnection = false, connectedPeers) peer.ref ! PeerActor.ConnectTo(uri) - context become listen(newPendingPeers, newPeers) + context become listening(newConnectedPeers) case Left(error) => handleConnectionErrors(error) } } - private def isConnectionHandled(remoteAddress: InetSocketAddress, pendingPeers: PeerMap, peers: PeerMap): Boolean = - !(peers ++ pendingPeers).values.map(_.remoteAddress).toSet.contains(remoteAddress) - - private def outgoingPeersAddresses(peers: PeerMap): Set[InetSocketAddress] = - peers.filter { case (_, p) => !p.incomingConnection }.values.map(_.remoteAddress).toSet - - private def countOutgoingPeers(peers: PeerMap): Int = peers.count { case (_, p) => !p.incomingConnection } - - private def countIncomingPeers(peers: PeerMap): Int = peers.count { case (_, p) => p.incomingConnection } - - def handleCommonMessages(pendingPeers: PeerMap, peers: PeerMap): Receive = { + def handleCommonMessages(connectedPeers: ConnectedPeers): Receive = { case GetPeers => - getPeers(peers.values.toSet).pipeTo(sender()) + getPeers(connectedPeers.peers.values.toSet).pipeTo(sender()) - case SendMessage(message, peerId) if peers.contains(peerId) => - peers(peerId).ref ! PeerActor.SendMessage(message) + case SendMessage(message, peerId) if connectedPeers.getPeer(peerId).isDefined => + connectedPeers.getPeer(peerId).get.ref ! PeerActor.SendMessage(message) case Terminated(ref) => - val terminatedPeers = terminatedPeersIds(ref, pendingPeers, peers) - - terminatedPeers.foreach { peerId => + val (terminatedPeersIds, newConnectedPeers) = connectedPeers.removeTerminatedPeer(ref) + terminatedPeersIds.foreach { peerId => peerEventBus ! Publish(PeerEvent.PeerDisconnected(peerId)) } - context unwatch ref - context become listen(pendingPeers -- terminatedPeers, peers -- terminatedPeers) - case PeerEvent.PeerHandshakeSuccessful(peer, _) if peer.incomingConnection => - if (countIncomingPeers(peers) >= peerConfiguration.maxIncomingPeers) { - peer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers) + context unwatch ref + context become listening(newConnectedPeers) + + case PeerEvent.PeerHandshakeSuccessful(handshakedPeer, _) => + if ( + handshakedPeer.incomingConnection && connectedPeers.incomingHandshakedPeersCount >= peerConfiguration.maxIncomingPeers + ) { + handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers) + } else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) { + // FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish + // it into PendingPeer/HandshakedPeer classes + + // Even though we do already validations for this, we might have missed it someone tried connecting to us at the + // same time as we do + log.debug(s"Disconnecting from ${handshakedPeer.remoteAddress} as we are already connected to him") + handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected) } else { - context become listen(pendingPeers - peer.id, peers + (peer.id -> peer)) + context become listening(connectedPeers.promotePeerToHandshaked(handshakedPeer)) } } - private def terminatedPeersIds(ref: ActorRef, pendingPeers: PeerMap, peers: PeerMap): Iterable[PeerId] = { - (peers ++ pendingPeers).collect { case (id, Peer(_, peerRef, _)) if peerRef == ref => id } - } - private def createPeer( address: InetSocketAddress, incomingConnection: Boolean, - pendingPeers: PeerMap, - peers: PeerMap - ): (Peer, PeerMap, PeerMap) = { + connectedPeers: ConnectedPeers + ): (Peer, ConnectedPeers) = { val ref = peerFactory(context, address, incomingConnection) context watch ref - val peer = Peer(address, ref, incomingConnection) - val newPeer = peer.id -> peer + val pendingPeer = Peer(address, ref, incomingConnection, None) - val (newPendingPeers, newPeers) = if (peer.incomingConnection) { - (pendingPeers + newPeer, peers) - } else { - (pendingPeers, peers + newPeer) - } + val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer) - (peer, newPendingPeers, newPeers) + (pendingPeer, newConnectedPeers) } private def getPeers(peers: Set[Peer]): Future[Peers] = { diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala index 0512bd21d9..58bd74577c 100644 --- a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala +++ b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala @@ -30,14 +30,19 @@ class PeerDiscoveryManager( var pingedNodes: Map[ByteString, PingInfo] = Map.empty - var nodesInfo: Map[ByteString, DiscoveryNodeInfo] = { + val startingNodes: Map[ByteString, DiscoveryNodeInfo] = { val knownNodesURIs = if (discoveryConfig.discoveryEnabled) knownNodesStorage.getKnownNodes() else Set.empty - val nodesInfo = knownNodesURIs.map(uri => DiscoveryNodeInfo.fromUri(uri)) ++ bootStrapNodesInfo - nodesInfo.map { nodeInfo => nodeInfo.node.id -> nodeInfo }.toMap + val startingNodesInfo = knownNodesURIs.map(uri => DiscoveryNodeInfo.fromUri(uri)) ++ bootStrapNodesInfo + val startingNodesInfoWithoutSelf = startingNodesInfo.filterNot { + _.node.id == ByteString(nodeStatusHolder.get().nodeId) + } + startingNodesInfoWithoutSelf.map { nodeInfo => nodeInfo.node.id -> nodeInfo }.toMap } + var nodesInfo: Map[ByteString, DiscoveryNodeInfo] = startingNodes + if (discoveryConfig.discoveryEnabled) { discoveryListener ! DiscoveryListener.Subscribe context.system.scheduler.scheduleWithFixedDelay( diff --git a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala index 8d38f541ee..e7c43ff2fc 100644 --- a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala +++ b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala @@ -78,7 +78,7 @@ class DebugServiceSpec extends AnyFlatSpec with Matchers with MockFactory with S bestBlockHash = peerStatus.bestHash ) val peer1Probe = TestProbe() - val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, incomingConnection = false) + val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false) val peer1Info: PeerInfo = initialPeerInfo.withForkAccepted(false) } } diff --git a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala index 477e53c592..f283f5d5cf 100644 --- a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala @@ -255,17 +255,19 @@ class EtcPeerManagerSpec extends AnyFlatSpec with Matchers { bestBlockHash = peerStatus.bestHash ) + val fakeNodeId = ByteString() + val peer1Probe = TestProbe() - val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false) + val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, false, Some(fakeNodeId)) val peer1Info = initialPeerInfo.withForkAccepted(false) val peer2Probe = TestProbe() - val peer2 = Peer(new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false) + val peer2 = Peer(new InetSocketAddress("127.0.0.1", 2), peer2Probe.ref, false, Some(fakeNodeId)) val peer2Info = initialPeerInfo.withForkAccepted(false) val peer3Probe = TestProbe() - val peer3 = Peer(new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false) + val peer3 = Peer(new InetSocketAddress("127.0.0.1", 3), peer3Probe.ref, false, Some(fakeNodeId)) val freshPeerProbe = TestProbe() - val freshPeer = Peer(new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false) + val freshPeer = Peer(new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false, Some(fakeNodeId)) val freshPeerInfo = initialPeerInfo.withForkAccepted(false) val peerManager = TestProbe() diff --git a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala index 64be00f2c9..03f370e17b 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala @@ -4,6 +4,7 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.TestProbe +import akka.util.ByteString import io.iohk.ethereum.Fixtures import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{MessageFromPeer, PeerDisconnected, PeerHandshakeSuccessful} import io.iohk.ethereum.network.PeerEventBusActor.PeerSelector @@ -96,7 +97,7 @@ class PeerEventBusActorSpec extends AnyFlatSpec with Matchers { peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe1.ref) peerEventBusActor.tell(PeerEventBusActor.Subscribe(PeerHandshaked), probe2.ref) - val peerHandshaked = new Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false) + val peerHandshaked = new Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, false, Some(ByteString())) val msgPeerHandshaked = PeerHandshakeSuccessful(peerHandshaked, initialPeerInfo) peerEventBusActor ! PeerEventBusActor.Publish(msgPeerHandshaked) diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index 8333a4d0fa..10124bc3b4 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -3,11 +3,12 @@ package io.iohk.ethereum.network import java.net.{InetSocketAddress, URI} import akka.actor._ -import akka.testkit.{TestActorRef, TestProbe} +import akka.testkit.{TestActorRef, TestKit, TestProbe} +import akka.util.ByteString import com.miguno.akka.testing.VirtualTime import io.iohk.ethereum.domain.{Block, BlockBody, BlockHeader} import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo -import io.iohk.ethereum.network.PeerActor.PeerClosedConnection +import io.iohk.ethereum.network.PeerActor.{ConnectTo, PeerClosedConnection} import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerHandshaked import io.iohk.ethereum.network.PeerEventBusActor.{PeerEvent, Publish, Subscribe} @@ -18,30 +19,30 @@ import io.iohk.ethereum.network.p2p.messages.Versions import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect import io.iohk.ethereum.utils.Config import io.iohk.ethereum.{Fixtures, NormalPatience} +import org.bouncycastle.util.encoders.Hex import org.scalatest.concurrent.Eventually -import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers // scalastyle:off magic.number -class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with NormalPatience { +class PeerManagerSpec extends TestKit(ActorSystem("PeerManagerSpec_System")) with AnyFlatSpecLike with Matchers with Eventually with NormalPatience { "PeerManager" should "try to connect to bootstrap and known nodes on startup" in new TestSetup { - startConnecting() - - system.terminate() + start() + handleInitialNodesDiscovery() } it should "blacklist peer that fail to establish tcp connection" in new TestSetup { + start() + handleInitialNodesDiscovery() - startConnecting() - - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) peerManager ! PeerManagerActor.HandlePeerConnection(incomingConnection1.ref, incomingPeerAddress1) - val probe2: TestProbe = createdPeers(2) + val probe2: TestProbe = createdPeers(2).probe val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true) peerManager ! PeerClosedConnection(peer.remoteAddress.getHostString, Disconnect.Reasons.Other) @@ -49,14 +50,13 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor eventually { peerManager.underlyingActor.blacklistedPeers.size shouldEqual 1 } - - system.terminate() } it should "retry connections to remaining bootstrap nodes" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) @@ -68,24 +68,24 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerDiscoveryManager.expectMsg(PeerDiscoveryManager.GetDiscoveredNodesInfo) peerDiscoveryManager.reply(PeerDiscoveryManager.DiscoveredNodesInfo(bootstrapNodes)) - system.terminate() } it should "publish disconnect messages from peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.ref ! PoisonPill time.advance(21000) // connect to 2 bootstrap peers peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe.ref.path.name)))) - system.terminate() } it should "not handle the connection from a peer that's already connected" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() val connection = TestProbe() @@ -95,14 +95,14 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! PeerManagerActor.HandlePeerConnection(connection.ref, new InetSocketAddress("127.0.0.1", 30340)) watcher.expectMsgClass(classOf[Terminated]) - system.terminate() } it should "handle pending and handshaked incoming peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - createdPeers.head.expectMsgClass(classOf[PeerActor.ConnectTo]) - createdPeers(1).expectMsgClass(classOf[PeerActor.ConnectTo]) + createdPeers.head.probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) time.advance(21000) // wait for next scan @@ -111,8 +111,8 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! PeerManagerActor.HandlePeerConnection(incomingConnection1.ref, incomingPeerAddress1) - val probe2: TestProbe = createdPeers(2) - val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true) + val probe2: TestProbe = createdPeers(2).probe + val peer = Peer(incomingPeerAddress1, probe2.ref, incomingConnection = true, Some(incomingNodeId1)) probe2.expectMsg(PeerActor.HandleConnection(incomingConnection1.ref, incomingPeerAddress1)) probe2.reply(PeerEvent.PeerHandshakeSuccessful(peer, initialPeerInfo)) @@ -125,9 +125,9 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor watcher.expectMsgClass(classOf[Terminated]) - val probe3: TestProbe = createdPeers(3) + val probe3: TestProbe = createdPeers(3).probe - val secondPeer = Peer(incomingPeerAddress2, probe3.ref, incomingConnection = true) + val secondPeer = Peer(incomingPeerAddress2, probe3.ref, incomingConnection = true, Some(incomingNodeId2)) probe3.expectMsg(PeerActor.HandleConnection(incomingConnection2.ref, incomingPeerAddress2)) probe3.reply(PeerEvent.PeerHandshakeSuccessful(secondPeer, initialPeerInfo)) @@ -137,24 +137,23 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor probe3.ref ! PoisonPill peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe3.ref.path.name)))) - system.terminate() } it should "handle common message about getting peers" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() val requestSender = TestProbe() requestSender.send(peerManager, GetPeers) requestSender.expectMsgClass(classOf[Peers]) - - system.terminate() } it should "handle common message about sending message to peer" in new TestSetup { - startConnecting() + start() + handleInitialNodesDiscovery() - val probe: TestProbe = createdPeers(1) + val probe: TestProbe = createdPeers(1).probe probe.expectMsgClass(classOf[PeerActor.ConnectTo]) @@ -164,16 +163,71 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor peerManager ! SendMessage(block, PeerId(probe.ref.path.name)) probe.expectMsg(PeerActor.SendMessage(block)) + } + + it should "disconnect from incoming peers already handshaked" in new TestSetup { + start() + handleInitialNodesDiscovery() + + // Finish handshake with the first of the bootstrap peers + val TestPeer(peerAsOutgoing, peerAsOutgoingProbe) = createdPeers.head + + val ConnectTo(uriConnectedTo) = peerAsOutgoingProbe.expectMsgClass(classOf[PeerActor.ConnectTo]) + val nodeId = ByteString(Hex.decode(uriConnectedTo.getUserInfo)) - system.terminate() + peerAsOutgoingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsOutgoing.copy(nodeId = Some(nodeId)), initialPeerInfo)) + + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + + // Repeated incoming connection from one of the bootstrap peers + val peerAsIncomingTcpConnection = incomingConnection1 + val peerAsIncomingAddress = incomingPeerAddress1 + + peerManager ! PeerManagerActor.HandlePeerConnection(peerAsIncomingTcpConnection.ref, peerAsIncomingAddress) + + val peerAsIncomingProbe = createdPeers.last.probe + val peerAsIncoming = Peer(peerAsIncomingAddress, peerAsIncomingProbe.ref, incomingConnection = true, Some(nodeId)) + + peerAsIncomingProbe.expectMsg(PeerActor.HandleConnection(peerAsIncomingTcpConnection.ref, peerAsIncoming.remoteAddress)) + peerAsIncomingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsIncoming, initialPeerInfo)) + + peerAsIncomingProbe.expectMsg(PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected)) } - trait TestSetup { - implicit lazy val system: ActorSystem = ActorSystem("PeerManagerActorSpec_System") + it should "disconnect from outgoing peer if, while it was pending, the same peer hanshaked as incoming" in new TestSetup { + start() + handleInitialNodesDiscovery() + + // Keep both bootstrap peers as pending + val TestPeer(peerAsOutgoing, peerAsOutgoingProbe) = createdPeers.head + + val ConnectTo(uriConnectedTo) = peerAsOutgoingProbe.expectMsgClass(classOf[PeerActor.ConnectTo]) + val nodeId = ByteString(Hex.decode(uriConnectedTo.getUserInfo)) + createdPeers(1).probe.expectMsgClass(classOf[PeerActor.ConnectTo]) + + // Receive incoming connection from one of the bootstrap peers + val peerAsIncomingTcpConnection = incomingConnection1 + val peerAsIncomingAddress = incomingPeerAddress1 + + peerManager ! PeerManagerActor.HandlePeerConnection(peerAsIncomingTcpConnection.ref, peerAsIncomingAddress) + + val peerAsIncomingProbe = createdPeers.last.probe + val peerAsIncoming = Peer(peerAsIncomingAddress, peerAsIncomingProbe.ref, incomingConnection = true, Some(nodeId)) + + peerAsIncomingProbe.expectMsg(PeerActor.HandleConnection(peerAsIncomingTcpConnection.ref, peerAsIncoming.remoteAddress)) + peerAsIncomingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsIncoming, initialPeerInfo)) + + // Handshake with peer as outgoing is finished + peerAsOutgoingProbe.reply(PeerEvent.PeerHandshakeSuccessful(peerAsOutgoing.copy(nodeId = Some(nodeId)), initialPeerInfo)) + peerAsOutgoingProbe.expectMsg(PeerActor.DisconnectPeer(Disconnect.Reasons.AlreadyConnected)) + } + + trait TestSetup { val time = new VirtualTime - var createdPeers: Seq[TestProbe] = Seq.empty + case class TestPeer(peer: Peer, probe: TestProbe) + var createdPeers: Seq[TestPeer] = Seq.empty val peerConfiguration: PeerConfiguration = Config.Network.peer val discoveryConfig = DiscoveryConfig(Config.config, Config.blockchains.blockchainConfig.bootstrapNodes) @@ -187,18 +241,21 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor .map(PeerDiscoveryManager.DiscoveryNodeInfo.fromNode) val knownNodes: Set[URI] = Set.empty - val peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef = { (ctx, addr, _) => - val peer = TestProbe() - createdPeers :+= peer - peer.ref + val peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef = { (_, address, isIncoming) => + val peerProbe = TestProbe() + createdPeers :+= TestPeer(Peer(address, peerProbe.ref, isIncoming), peerProbe) + peerProbe.ref } - val incomingConnection1 = TestProbe() - val incomingConnection2 = TestProbe() - val incomingConnection3 = TestProbe() val port = 30340 + val incomingConnection1 = TestProbe() + val incomingNodeId1 = ByteString(1) val incomingPeerAddress1 = new InetSocketAddress("127.0.0.2", port) + val incomingConnection2 = TestProbe() + val incomingNodeId2 = ByteString(2) val incomingPeerAddress2 = new InetSocketAddress("127.0.0.3", port) + val incomingConnection3 = TestProbe() + val incomingNodeId3 = ByteString(3) val incomingPeerAddress3 = new InetSocketAddress("127.0.0.4", port) val peerStatus = Status( @@ -231,12 +288,15 @@ class PeerManagerSpec extends AnyFlatSpec with Matchers with Eventually with Nor ) )(system) - def startConnecting(): Unit = { + def start(): Unit = { + peerEventBus.expectMsg(Subscribe(PeerHandshaked)) + peerManager ! PeerManagerActor.StartConnecting + } + def handleInitialNodesDiscovery(): Unit = { time.advance(6000) // wait for bootstrap nodes scan - peerEventBus.expectMsg(Subscribe(PeerHandshaked)) peerDiscoveryManager.expectMsg(PeerDiscoveryManager.GetDiscoveredNodesInfo) peerDiscoveryManager.reply(PeerDiscoveryManager.DiscoveredNodesInfo(bootstrapNodes)) knownNodesManager.expectMsg(KnownNodesManager.GetKnownNodes) From 2e1fc09a066d88eeef0b2f44d318cd7322062547 Mon Sep 17 00:00:00 2001 From: Nicolas Tallar Date: Wed, 4 Nov 2020 13:22:08 -0300 Subject: [PATCH 2/2] [ETCM-178] Added further lazy vals to ConnectedPeers --- .../scala/io/iohk/ethereum/network/ConnectedPeers.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala index df4ef21f53..b334b64cea 100644 --- a/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala +++ b/src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala @@ -16,8 +16,9 @@ case class ConnectedPeers( private lazy val allPeers: Map[PeerId, Peer] = outgoingPendingPeers ++ handshakedPeers ++ incomingPendingPeers + private lazy val allPeersRemoteAddresses: Set[InetSocketAddress] = allPeers.values.map(_.remoteAddress).toSet def isConnectionHandled(remoteAddress: InetSocketAddress): Boolean = - allPeers.values.map(_.remoteAddress).toSet.contains(remoteAddress) + allPeersRemoteAddresses.contains(remoteAddress) /* We have the node id of our outgoing pending peers so we could use that in our checks, by rejecting a peer that @@ -25,8 +26,9 @@ case class ConnectedPeers( However, with checking the node id of only handshaked peers we prioritize handshaked peers over pending ones, in the above mentioned case the repeated pending peer connection will eventually die out */ + private lazy val handshakedPeersNodeIds: Set[ByteString] = handshakedPeers.values.flatMap(_.nodeId).toSet def hasHandshakedWith(nodeId: ByteString): Boolean = - handshakedPeers.values.flatMap(_.nodeId).toSet.contains(nodeId) + handshakedPeersNodeIds.contains(nodeId) lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }