diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelector.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelector.scala index 6d8bec1d91..2c3f10e93b 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelector.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelector.scala @@ -109,13 +109,15 @@ class PivotBlockSelector( timeout: Cancellable, headers: Map[ByteString, BlockHeaderWithVotes] ): Unit = { - val BlockHeaderWithVotes(mostPopularBlockHeader, updatedVotes) = headers.mostVotedHeader + // most voted header can return empty if we asked one peer and it returned us non expected block. Then headers map is empty + // so there is no most voted header + val maybeBlockHeaderWithVotes = headers.mostVotedHeader // All peers responded - consensus reached - if (peersToAsk.isEmpty && updatedVotes >= minPeersToChoosePivotBlock) { + if (peersToAsk.isEmpty && maybeBlockHeaderWithVotes.exists(hWv => hWv.votes >= minPeersToChoosePivotBlock)) { timeout.cancel() - sendResponseAndCleanup(mostPopularBlockHeader) + sendResponseAndCleanup(maybeBlockHeaderWithVotes.get.header) // Consensus could not be reached - ask additional peer if available - } else if (!isPossibleToReachConsensus(peersToAsk.size, updatedVotes)) { + } else if (!isPossibleToReachConsensus(peersToAsk.size, maybeBlockHeaderWithVotes.map(_.votes).getOrElse(0))) { timeout.cancel() if (waitingPeers.nonEmpty) { // There are more peers to ask val newTimeout = scheduler.scheduleOnce(peerResponseTimeout, self, ElectionPivotBlockTimeout) @@ -206,11 +208,11 @@ object PivotBlockSelector { case class BlockHeaderWithVotes(header: BlockHeader, votes: Int = 1) { def vote: BlockHeaderWithVotes = copy(votes = votes + 1) } - + import cats.implicits._ implicit class SortableHeadersMap(headers: Map[ByteString, BlockHeaderWithVotes]) { - def mostVotedHeader: BlockHeaderWithVotes = headers.maxBy { case (_, headerWithVotes) => - headerWithVotes.votes - }._2 + def mostVotedHeader: Option[BlockHeaderWithVotes] = { + headers.toList.maximumByOption { case (_, headerWithVotes) => headerWithVotes.votes }.map(_._2) + } } case class ElectionDetails(participants: List[Peer], expectedPivotBlock: BigInt) { diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala index 7195ca01d4..9d4efc7638 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/PivotBlockSelectorSpec.scala @@ -188,6 +188,37 @@ class PivotBlockSelectorSpec ) } + it should "handle case when one peer responded with wrong block header" in new TestSetup { + override def minPeersToChoosePivotBlock: Int = 1 + + updateHandshakedPeers(HandshakedPeers(singlePeer)) + + pivotBlockSelector ! SelectPivotBlock + + peerMessageBus.expectMsgAllOf( + Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))) + ) + + etcPeerManager.expectMsgAllOf( + EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false), peer1.id) + ) + + // peer responds with block header number + pivotBlockSelector ! MessageFromPeer( + BlockHeaders(Seq(pivotBlockHeader.copy(number = expectedPivotBlock + 1))), + peer1.id + ) + + peerMessageBus.expectMsgAllOf( + Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))), + Unsubscribe() + ) + time.advance(syncConfig.syncRetryInterval) + + fastSync.expectNoMessage() // consensus not reached - process have to be repeated + peerMessageBus.expectNoMessage() + } + it should "not ask additional peers if not needed" in new TestSetup { override val minPeersToChoosePivotBlock = 2 override val peersToChoosePivotBlockMargin = 1