Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ class PivotBlockSelector(
timeout: Cancellable,
headers: Map[ByteString, BlockHeaderWithVotes]
): Unit = {
val BlockHeaderWithVotes(mostPopularBlockHeader, updatedVotes) = headers.mostVotedHeader
// most voted header can return empty if we asked one peer and it returned us non expected block. Then headers map is empty
// so there is no most voted header
val maybeBlockHeaderWithVotes = headers.mostVotedHeader
// All peers responded - consensus reached
if (peersToAsk.isEmpty && updatedVotes >= minPeersToChoosePivotBlock) {
if (peersToAsk.isEmpty && maybeBlockHeaderWithVotes.exists(hWv => hWv.votes >= minPeersToChoosePivotBlock)) {
timeout.cancel()
sendResponseAndCleanup(mostPopularBlockHeader)
sendResponseAndCleanup(maybeBlockHeaderWithVotes.get.header)
// Consensus could not be reached - ask additional peer if available
} else if (!isPossibleToReachConsensus(peersToAsk.size, updatedVotes)) {
} else if (!isPossibleToReachConsensus(peersToAsk.size, maybeBlockHeaderWithVotes.map(_.votes).getOrElse(0))) {
timeout.cancel()
if (waitingPeers.nonEmpty) { // There are more peers to ask
val newTimeout = scheduler.scheduleOnce(peerResponseTimeout, self, ElectionPivotBlockTimeout)
Expand Down Expand Up @@ -206,11 +208,11 @@ object PivotBlockSelector {
case class BlockHeaderWithVotes(header: BlockHeader, votes: Int = 1) {
def vote: BlockHeaderWithVotes = copy(votes = votes + 1)
}

import cats.implicits._
implicit class SortableHeadersMap(headers: Map[ByteString, BlockHeaderWithVotes]) {
def mostVotedHeader: BlockHeaderWithVotes = headers.maxBy { case (_, headerWithVotes) =>
headerWithVotes.votes
}._2
def mostVotedHeader: Option[BlockHeaderWithVotes] = {
headers.toList.maximumByOption { case (_, headerWithVotes) => headerWithVotes.votes }.map(_._2)
}
}

case class ElectionDetails(participants: List[Peer], expectedPivotBlock: BigInt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,37 @@ class PivotBlockSelectorSpec
)
}

it should "handle case when one peer responded with wrong block header" in new TestSetup {
override def minPeersToChoosePivotBlock: Int = 1

updateHandshakedPeers(HandshakedPeers(singlePeer))

pivotBlockSelector ! SelectPivotBlock

peerMessageBus.expectMsgAllOf(
Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id)))
)

etcPeerManager.expectMsgAllOf(
EtcPeerManagerActor.SendMessage(GetBlockHeaders(Left(expectedPivotBlock), 1, 0, reverse = false), peer1.id)
)

// peer responds with block header number
pivotBlockSelector ! MessageFromPeer(
BlockHeaders(Seq(pivotBlockHeader.copy(number = expectedPivotBlock + 1))),
peer1.id
)

peerMessageBus.expectMsgAllOf(
Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer1.id))),
Unsubscribe()
)
time.advance(syncConfig.syncRetryInterval)

fastSync.expectNoMessage() // consensus not reached - process have to be repeated
peerMessageBus.expectNoMessage()
}

it should "not ask additional peers if not needed" in new TestSetup {
override val minPeersToChoosePivotBlock = 2
override val peersToChoosePivotBlockMargin = 1
Expand Down