diff --git a/src/ets/scala/io/iohk/ethereum/ets/blockchain/EthashTestBlockHeaderValidator.scala b/src/ets/scala/io/iohk/ethereum/ets/blockchain/EthashTestBlockHeaderValidator.scala
index 9c7d8eab62..9e91a37c45 100644
--- a/src/ets/scala/io/iohk/ethereum/ets/blockchain/EthashTestBlockHeaderValidator.scala
+++ b/src/ets/scala/io/iohk/ethereum/ets/blockchain/EthashTestBlockHeaderValidator.scala
@@ -15,7 +15,7 @@ class EthashTestBlockHeaderValidator(blockchainConfig: BlockchainConfig) extends
protected def difficulty: DifficultyCalculator = DifficultyCalculator(blockchainConfig)
- def validateEvenMore(blockHeader: BlockHeader, parentHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ override def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
Right(BlockHeaderValid)
}
diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
index f6aab38782..f00d619732 100644
--- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
+++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
@@ -59,24 +59,26 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {
it should "sync blockchain with state nodes when peer do not response with full responses" in
customTestCaseResourceM(
- FakePeer.start3FakePeersRes(
+ FakePeer.start4FakePeersRes(
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()),
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig())
)
- ) { case (peer1, peer2, peer3) =>
+ ) { case (peer1, peer2, peer3, peer4) =>
for {
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500))
- _ <- peer1.connectToPeers(Set(peer2.node, peer3.node))
+ _ <- peer4.importBlocksUntil(1000)(updateStateAtBlock(500))
+
+ _ <- peer1.connectToPeers(Set(peer2.node, peer3.node, peer4.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500)
- // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
+ // due to the fact that function generating state is deterministic both peer3 and peer4 ends up with exactly same
// state, so peer1 can get whole trie from both of them.
- assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset)
+ assert(peer1.bl.getBestBlockNumber() == peer4.bl.getBestBlockNumber() - peer4.testSyncConfig.pivotBlockOffset)
assert(trie.isDefined)
assert(synchronizingPeerHaveAllData)
}
@@ -84,24 +86,26 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {
it should "sync blockchain with state nodes when one of the peers send empty state responses" in
customTestCaseResourceM(
- FakePeer.start3FakePeersRes(
+ FakePeer.start4FakePeersRes(
fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()),
fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig().copy(maxMptComponentsPerMessage = 0))
)
- ) { case (peer1, peer2, peer3) =>
+ ) { case (peer1, peer2, peer3, peer4) =>
for {
_ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500))
_ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500))
- _ <- peer1.connectToPeers(Set(peer2.node, peer3.node))
+ _ <- peer4.importBlocksUntil(1000)(updateStateAtBlock(500))
+
+ _ <- peer1.connectToPeers(Set(peer2.node, peer3.node, peer4.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
val trie = peer1.getBestBlockTrie()
val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500)
- // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
+ // due to the fact that function generating state is deterministic both peer3 and peer4 ends up with exactly same
// state, so peer1 can get whole trie from both of them.
- assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset)
+ assert(peer1.bl.getBestBlockNumber() == peer4.bl.getBestBlockNumber() - peer4.testSyncConfig.pivotBlockOffset)
assert(trie.isDefined)
assert(synchronizingPeerHaveAllData)
}
diff --git a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
index afba56b079..5fed72d149 100644
--- a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
+++ b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
@@ -222,7 +222,8 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
startRetryInterval = 50.milliseconds,
nodesPerRequest = 200,
maxTargetDifference = 1,
- syncRetryInterval = 50.milliseconds
+ syncRetryInterval = 50.milliseconds,
+ blacklistDuration = 100.seconds
)
lazy val broadcaster = new BlockBroadcast(etcPeerManager)
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index 29f396f731..7dfc4e395a 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -369,7 +369,9 @@ mantis {
max-concurrent-requests = 50
# Requested number of block headers when syncing from other peers
- block-headers-per-request = 200
+ # Will cause an error if it's higher than max-blocks-headers-per-message of the peer we're requesting from,
+ # so this number should not be set very high.
+ block-headers-per-request = 100
# Requested number of block bodies when syncing from other peers
block-bodies-per-request = 128
@@ -469,6 +471,10 @@ mantis {
# (peer.bestKnownBlock - pivot-block-offset) - node.curentPivotBlock > max-pivot-age
# it fast sync pivot block has become stale and it needs update
max-pivot-block-age = 96
+
+ # Maximum number of retries performed by fast sync when the master peer sends invalid block headers.
+ # On reaching this limit, it will perform branch resolving.
+ fast-sync-max-batch-retries = 5
}
pruning {
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
index 9dcbcc9959..7fd571de93 100644
--- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala
@@ -93,7 +93,7 @@ object Blacklist {
case object WrongBlockHeaders extends BlacklistReason {
val reasonType: BlacklistReasonType = WrongBlockHeadersType
- val description: String = "Wrong blockheaders response (empty or not chain forming)"
+ val description: String = "Wrong blockheaders response: Peer didn't respond with requested block headers."
}
case object BlockHeaderValidationFailed extends BlacklistReason {
val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala
index 0bca267ce1..f9e3a7d33f 100644
--- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala
@@ -17,6 +17,8 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
private implicit val ec: ExecutionContext = context.dispatcher
+ protected val bigIntReverseOrdering: Ordering[BigInt] = Ordering[BigInt].reverse
+
def etcPeerManager: ActorRef
def peerEventBus: ActorRef
def blacklist: Blacklist
@@ -44,6 +46,9 @@ trait PeerListSupportNg { self: Actor with ActorLogging =>
def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)
+ def getPeerWithHighestBlock: Option[PeerWithInfo] =
+ peersToDownloadFrom.values.toList.sortBy(_.peerInfo.maxBlockNumber)(bigIntReverseOrdering).headOption
+
def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit =
handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason))
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerRequestHandler.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerRequestHandler.scala
index b0e82650d3..bb99936bd8 100644
--- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerRequestHandler.scala
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerRequestHandler.scala
@@ -24,15 +24,15 @@ class PeerRequestHandler[RequestMsg <: Message, ResponseMsg <: Message: ClassTag
import PeerRequestHandler._
- val initiator: ActorRef = context.parent
+ private val initiator: ActorRef = context.parent
- val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)
+ private val timeout: Cancellable = scheduler.scheduleOnce(responseTimeout, self, Timeout)
- val startTime: Long = System.currentTimeMillis()
+ private val startTime: Long = System.currentTimeMillis()
private def subscribeMessageClassifier = MessageClassifier(Set(responseMsgCode), PeerSelector.WithId(peer.id))
- def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime
+ private def timeTakenSoFar(): Long = System.currentTimeMillis() - startTime
override def preStart(): Unit = {
etcPeerManager ! EtcPeerManagerActor.SendMessage(toSerializable(requestMsg), peer.id)
@@ -79,8 +79,8 @@ object PeerRequestHandler {
)(implicit scheduler: Scheduler, toSerializable: RequestMsg => MessageSerializable): Props =
Props(new PeerRequestHandler(peer, responseTimeout, etcPeerManager, peerEventBus, requestMsg, responseMsgCode))
- case class RequestFailed(peer: Peer, reason: String)
- case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)
+ final case class RequestFailed(peer: Peer, reason: String)
+ final case class ResponseReceived[T](peer: Peer, response: T, timeTaken: Long)
private case object Timeout
}
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala
index 3ffa6adec6..86eb0b28db 100644
--- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala
@@ -1,15 +1,17 @@
package io.iohk.ethereum.blockchain.sync.fast
-import java.time.Instant
import akka.actor._
import akka.util.ByteString
+import cats.NonEmptyTraverse.ops.toAllNonEmptyTraverseOps
import cats.data.NonEmptyList
+import cats.implicits._
+import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason._
+import io.iohk.ethereum.blockchain.sync.Blacklist._
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync._
-import io.iohk.ethereum.blockchain.sync.Blacklist._
-import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason._
+import io.iohk.ethereum.blockchain.sync.fast.HeaderSkeleton._
import io.iohk.ethereum.blockchain.sync.fast.ReceiptsValidator.ReceiptsValidationResult
import io.iohk.ethereum.blockchain.sync.fast.SyncBlocksValidator.BlockBodyValidationResult
import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.{
@@ -18,12 +20,12 @@ import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.{
StateSyncFinished,
WaitingForNewTargetBlock
}
-import io.iohk.ethereum.consensus.validators.Validators
+import io.iohk.ethereum.consensus.validators.{BlockHeaderError, BlockHeaderValid, Validators}
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
import io.iohk.ethereum.domain._
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
-import io.iohk.ethereum.network.{Peer, PeerId}
+import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.messages.Codes
import io.iohk.ethereum.network.p2p.messages.PV62._
import io.iohk.ethereum.network.p2p.messages.PV63._
@@ -31,6 +33,7 @@ import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import org.bouncycastle.util.encoders.Hex
+import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext.Implicits.global
@@ -113,24 +116,35 @@ class FastSync(
// scalastyle:off number.of.methods
private class SyncingHandler(initialSyncState: SyncState) {
- private val BlockHeadersHandlerName = "block-headers-request-handler"
//not part of syncstate as we do not want to persist is.
private var stateSyncRestartRequested = false
- private var requestedHeaders: Map[Peer, BigInt] = Map.empty
+ private var requestedHeaders: Map[Peer, HeaderRange] = Map.empty
private var syncState = initialSyncState
private var assignedHandlers: Map[ActorRef, Peer] = Map.empty
private var peerRequestsTime: Map[Peer, Instant] = Map.empty
+ private var masterPeer: Option[Peer] = None
+ // TODO ETCM-701 get rid of state and move skeleton download to a separate actor
+ private var currentSkeletonState: Option[HeaderSkeleton] = None
+ private var skeletonHandler: Option[ActorRef] = None
+ private var batchFailuresCount = 0
+ private var blockHeadersQueue: Seq[HeaderRange] = Nil
+
private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty
private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty
private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), "state-storage")
-
syncStateStorageActor ! fastSyncStateStorage
+ private val branchResolver = context.actorOf(
+ FastSyncBranchResolverActor
+ .props(self, peerEventBus, etcPeerManager, blockchain, blacklist, syncConfig, appStateStorage, scheduler),
+ "fast-sync-branch-resolver"
+ )
+
private val syncStateScheduler = context.actorOf(
SyncStateSchedulerActor
.props(
@@ -170,25 +184,36 @@ class FastSync(
case ProcessSyncing => processSyncing()
case PrintStatus => printStatus()
case PersistSyncState => persistSyncState()
+ case r @ ResponseReceived(_, _, _) => handleResponses(r)
case StateSyncFinished =>
syncState = syncState.copy(stateSyncFinished = true)
processSyncing()
+ case PeerRequestHandler.RequestFailed(peer, reason) =>
+ handleRequestFailure(peer, sender(), RequestFailed(reason))
+ case Terminated(ref) if assignedHandlers.contains(ref) =>
+ handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated)
+ }
+ // TODO ETCM-701 will be moved to separate actor and refactored
+ private def handleResponses: Receive = {
case ResponseReceived(peer, BlockHeaders(blockHeaders), timeTaken) =>
- log.info("*** Received {} block headers in {} ms ***", blockHeaders.size, timeTaken)
+ log.info(
+ "*** Received {} block headers from peer [{}] in {} ms ***",
+ blockHeaders.size,
+ peer.id,
+ timeTaken
+ )
FastSyncMetrics.setBlockHeadersDownloadTime(timeTaken)
-
- requestedHeaders.get(peer).foreach { requestedNum =>
- removeRequestHandler(sender())
- requestedHeaders -= peer
- if (
- blockHeaders.nonEmpty && blockHeaders.size <= requestedNum && blockHeaders.head.number == syncState.bestBlockHeaderNumber + 1
- )
- handleBlockHeaders(peer, blockHeaders)
- else
- blacklist.add(peer.id, blacklistDuration, WrongBlockHeaders)
+ currentSkeletonState match {
+ case Some(currentSkeleton) =>
+ if (skeletonHandler.contains(sender())) handleSkeletonResponse(peer, blockHeaders, currentSkeleton)
+ else handleHeaderBatchResponse(peer, blockHeaders, currentSkeleton)
+ case None =>
+ log.warning(
+ s"Received response to fill in header skeleton, but current header skeleton is not defined."
+ )
+ processSyncing()
}
-
case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) =>
log.info("Received {} block bodies in {} ms", blockBodies.size, timeTaken)
FastSyncMetrics.setBlockBodiesDownloadTime(timeTaken)
@@ -197,7 +222,6 @@ class FastSync(
requestedBlockBodies -= sender()
removeRequestHandler(sender())
handleBlockBodies(peer, requestedBodies, blockBodies)
-
case ResponseReceived(peer, Receipts(receipts), timeTaken) =>
log.info("Received {} receipts in {} ms", receipts.size, timeTaken)
FastSyncMetrics.setBlockReceiptsDownloadTime(timeTaken)
@@ -206,12 +230,149 @@ class FastSync(
requestedReceipts -= sender()
removeRequestHandler(sender())
handleReceipts(peer, requestedHashes, receipts)
+ }
- case PeerRequestHandler.RequestFailed(peer, reason) =>
- handleRequestFailure(peer, sender(), RequestFailed(reason))
+ private def handleSkeletonResponse(
+ peer: Peer,
+ blockHeaders: Seq[BlockHeader],
+ currentSkeleton: HeaderSkeleton
+ ): Unit = {
+ def validateDownloadedHeaders = blockHeaders.toList.traverse_(validateHeaderOnly)
+
+ log.info("Handling new received skeleton from peer [{}].", peer.id.value)
+
+ skeletonHandler.foreach(context.unwatch)
+ skeletonHandler = None
+
+ validateDownloadedHeaders match {
+ case Left(error) =>
+ log.info(s"Validation of skeleton from $peer failed: $error")
+ blockHeadersError(peer, BlacklistReason.BlockHeaderValidationFailed)
+ case Right(_) =>
+ currentSkeleton.setSkeletonHeaders(blockHeaders) match {
+ case Left(error) =>
+ // TODO ETCM-701 if this error keeps happening, switch master peer
+ log.warning("Failed to set skeleton headers from peer [{}]: [{}]", peer.id.value, error.msg)
+ requestSkeletonHeaders(peer)
+ case Right(updatedSkeleton) =>
+ log.debug(
+ "Updated current skeleton header. Included batches (starting numbers): [{}]",
+ updatedSkeleton.batchStartingHeaderNumbers.mkString(", ")
+ )
+ currentSkeletonState = Some(updatedSkeleton)
+ blockHeadersQueue ++= updatedSkeleton.batchStartingHeaderNumbers.map(from =>
+ HeaderRange(from, updatedSkeleton.batchSize)
+ )
+ }
+ }
+ }
- case Terminated(ref) if assignedHandlers.contains(ref) =>
- handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated)
+ private def handleHeaderBatchResponse(
+ peer: Peer,
+ blockHeaders: Seq[BlockHeader],
+ currentSkeleton: HeaderSkeleton
+ ): Unit = {
+ def validHeadersChain(headers: Seq[BlockHeader], requestedNum: BigInt): Boolean = {
+ headers.nonEmpty && headers.size <= requestedNum && checkHeadersChain(headers)
+ }
+
+ removeRequestHandler(sender())
+ requestedHeaders.get(peer) match {
+ case Some(requested) =>
+ log.debug("Validating [{}] received block headers from peer [{}]", blockHeaders.size, peer.id.value)
+ requestedHeaders -= peer
+ if (validHeadersChain(blockHeaders, requested.limit))
+ fillSkeletonGap(peer, requested, blockHeaders, currentSkeleton)
+ else {
+ handleHeaderResponseError(
+ InvalidDownloadedChain(blockHeaders),
+ requested,
+ peer,
+ BlacklistReason.WrongBlockHeaders
+ )
+ }
+ case None => log.warning("Received block headers from peer [{}] but weren't expecting any.", peer.id.value)
+ }
+ }
+
+ private def fillSkeletonGap(
+ peer: Peer,
+ request: HeaderRange,
+ blockHeaders: Seq[BlockHeader],
+ currentSkeleton: HeaderSkeleton
+ ): Unit = {
+ log.debug(
+ "Attempting to use [{}] block headers from peer [{}] to fill in header skeleton.",
+ blockHeaders.size,
+ peer.id
+ )
+ currentSkeleton.addBatch(blockHeaders) match {
+ case Right(skeleton) =>
+ log.debug("Successfully added headers from peer [{}] to current skeleton.", peer.id.value)
+ skeleton.fullChain match {
+ case Some(fullChain) =>
+ log.debug("Current header skeleton completed. Starting to request bodies and receipts.")
+ handleBlockHeadersChain(peer, fullChain)
+ currentSkeletonState = None
+ case None =>
+ log.debug("Skeleton is still incomplete. Waiting for remaining headers.")
+ currentSkeletonState = Some(skeleton)
+ }
+ case Left(error) =>
+ log.warning("Failed to add headers from peer [{}] to current skeleton. Error: [{}]", peer.id.value, error.msg)
+ handleHeaderResponseError(error, request, peer, BlacklistReason.BlockHeaderValidationFailed)
+ }
+ }
+
+ private def handleHeaderResponseError(
+ error: HeaderSkeletonError,
+ request: HeaderRange,
+ peer: Peer,
+ reason: BlacklistReason
+ ): Unit = {
+ def handleMasterPeerFailure(header: BlockHeader): Unit = {
+ batchFailuresCount += 1
+ if (batchFailuresCount > fastSyncMaxBatchRetries) {
+ log.info("Max skeleton batch failures reached. Master peer must be wrong.")
+ handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)
+
+ // Start branch resolution and wait for response from the FastSyncBranchResolver actor.
+ context become waitingForBranchResolution
+ branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
+ currentSkeletonState = None
+ }
+ }
+
+ blockHeadersQueue :+= request
+ error match {
+ // These are the reasons that make the master peer suspicious
+ case InvalidPenultimateHeader(_, header) => handleMasterPeerFailure(header)
+ case InvalidBatchHash(_, header) => handleMasterPeerFailure(header)
+ // Otherwise probably it's just this peer's fault
+ case _ =>
+ log.info(error.msg)
+ blockHeadersError(peer, reason)
+ }
+ }
+
+ private def waitingForBranchResolution: Receive = handleStatus orElse {
+ case FastSyncBranchResolverActor.BranchResolvedSuccessful(firstCommonBlockNumber, resolvedPeer) =>
+ // Reset the batch failures count
+ batchFailuresCount = 0
+
+ // Restart syncing from the valid block available in state.
+ syncState = syncState.copy(bestBlockHeaderNumber = firstCommonBlockNumber)
+ masterPeer = Some(resolvedPeer)
+ context become receive
+ processSyncing()
+ case _: FastSyncBranchResolverActor.BranchResolutionFailed =>
+ // there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again
+ branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
+ }
+
+ private def blockHeadersError(peer: Peer, blacklistReason: BlacklistReason): Unit = {
+ blacklist.add(peer.id, blacklistDuration, blacklistReason)
+ processSyncing()
}
def askForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Unit = {
@@ -226,26 +387,14 @@ class FastSync(
context become waitingForPivotBlockUpdate(updateReason)
}
- def reScheduleAskForNewPivot(updateReason: PivotBlockUpdateReason): Unit = {
- syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1)
- scheduler
- .scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock(updateReason))
- }
-
- private def stalePivotAfterRestart(
- newPivot: BlockHeader,
- currentState: SyncState,
- updateReason: PivotBlockUpdateReason
- ): Boolean = {
- newPivot.number == currentState.pivotBlock.number && updateReason.isSyncRestart
- }
-
private def newPivotIsGoodEnough(
newPivot: BlockHeader,
currentState: SyncState,
updateReason: PivotBlockUpdateReason
): Boolean = {
- newPivot.number >= currentState.pivotBlock.number && !stalePivotAfterRestart(newPivot, currentState, updateReason)
+ def stalePivotAfterRestart: Boolean =
+ newPivot.number == currentState.pivotBlock.number && updateReason.isSyncRestart
+ newPivot.number >= currentState.pivotBlock.number && !stalePivotAfterRestart
}
def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive =
@@ -267,6 +416,12 @@ class FastSync(
case UpdatePivotBlock(state) => updatePivotBlock(state)
}
+ private def reScheduleAskForNewPivot(updateReason: PivotBlockUpdateReason): Unit = {
+ syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1)
+ scheduler
+ .scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock(updateReason))
+ }
+
def currentSyncingStatus: SyncProtocol.Status =
SyncProtocol.Status.Syncing(
initialSyncState.lastFullBlockNumber,
@@ -346,6 +501,7 @@ class FastSync(
assignedHandlers -= handler
}
+ // TODO [ETCM-676]: Move to blockchain and make sure it's atomic
private def discardLastBlocks(startBlock: BigInt, blocksToDiscard: Int): Unit = {
(startBlock to ((startBlock - blocksToDiscard) max 1) by -1).foreach { n =>
blockchain.getBlockHeaderByNumber(n).foreach { headerToRemove =>
@@ -356,24 +512,6 @@ class FastSync(
appStateStorage.putBestBlockNumber((startBlock - blocksToDiscard - 1) max 0).commit()
}
- @tailrec
- private def processHeaders(peer: Peer, headers: Seq[BlockHeader]): HeaderProcessingResult = {
- if (headers.nonEmpty) {
- val header = headers.head
- processHeader(header, peer) match {
- case Left(result) => result
- case Right((header, weight)) =>
- updateSyncState(header, weight)
- if (header.number == syncState.safeDownloadTarget) {
- ImportedPivotBlock
- } else {
- processHeaders(peer, headers.tail)
- }
- }
- } else
- HeadersProcessingFinished
- }
-
private def validateHeader(header: BlockHeader, peer: Peer): Either[HeaderProcessingResult, BlockHeader] = {
val shouldValidate = header.number >= syncState.nextBlockToFullyValidate
@@ -412,19 +550,6 @@ class FastSync(
syncState = syncState.updateNextBlockToValidate(header, K, X)
}
- private def processHeader(
- header: BlockHeader,
- peer: Peer
- ): Either[HeaderProcessingResult, (BlockHeader, ChainWeight)] =
- for {
- validatedHeader <- validateHeader(header, peer)
- parentWeight <- getParentChainWeight(header)
- } yield (validatedHeader, parentWeight)
-
- private def getParentChainWeight(header: BlockHeader) = {
- blockchain.getChainWeightByHash(header.parentHash).toRight(ParentChainWeightNotFound(header))
- }
-
private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = {
blacklist.add(peer.id, duration, BlockHeaderValidationFailed)
if (header.number <= syncState.safeDownloadTarget) {
@@ -440,32 +565,56 @@ class FastSync(
}
}
- private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]): Unit = {
- if (checkHeadersChain(headers)) {
- processHeaders(peer, headers) match {
- case ParentChainWeightNotFound(header) =>
- // We could end in wrong fork and get blocked so we should rewind our state a little
- // we blacklist peer just in case we got malicious peer which would send us bad blocks, forcing us to rollback
- // to genesis
- log.warning("Parent chain weight not found for block {}, not processing rest of headers", header.idTag)
- handleRewind(header, peer, syncConfig.fastSyncBlockValidationN, syncConfig.blacklistDuration)
- case HeadersProcessingFinished =>
- processSyncing()
- case ImportedPivotBlock =>
- updatePivotBlock(ImportedLastBlock)
- case ValidationFailed(header, peerToBlackList) =>
- log.warning("validation of header {} failed", header.idTag)
- // pow validation failure indicate that either peer is malicious or it is on wrong fork
- handleRewind(
- header,
- peerToBlackList,
- syncConfig.fastSyncBlockValidationN,
- syncConfig.criticalBlacklistDuration
- )
- }
- } else {
- blacklist.add(peer.id, blacklistDuration, ErrorInBlockHeaders)
- processSyncing()
+ // scalastyle:off method.length
+ private def handleBlockHeadersChain(peer: Peer, headers: Seq[BlockHeader]): Unit = {
+ def processHeader(header: BlockHeader): Either[HeaderProcessingResult, (BlockHeader, ChainWeight)] =
+ for {
+ validatedHeader <- validateHeader(header, peer)
+ parentWeight <- getParentChainWeight(header)
+ } yield (validatedHeader, parentWeight)
+
+ def getParentChainWeight(header: BlockHeader) = {
+ blockchain.getChainWeightByHash(header.parentHash).toRight(ParentChainWeightNotFound(header))
+ }
+
+ @tailrec
+ def processHeaders(headers: Seq[BlockHeader]): HeaderProcessingResult = {
+ if (headers.nonEmpty) {
+ val header = headers.head
+ processHeader(header) match {
+ case Left(result) => result
+ case Right((header, weight)) =>
+ updateSyncState(header, weight)
+ if (header.number == syncState.safeDownloadTarget) {
+ ImportedPivotBlock
+ } else {
+ processHeaders(headers.tail)
+ }
+ }
+ } else
+ HeadersProcessingFinished
+ }
+
+ processHeaders(headers) match {
+ case ParentChainWeightNotFound(header) =>
+ // We could end in wrong fork and get blocked so we should rewind our state a little
+ // we blacklist peer just in case we got malicious peer which would send us bad blocks, forcing us to rollback
+ // to genesis
+ log.warning("Parent chain weight not found for block {}, not processing rest of headers", header.idTag)
+ handleRewind(header, peer, syncConfig.fastSyncBlockValidationN, syncConfig.blacklistDuration)
+ case HeadersProcessingFinished =>
+ processSyncing()
+ case ImportedPivotBlock =>
+ updatePivotBlock(ImportedLastBlock)
+ case ValidationFailed(header, peerToBlackList) =>
+ log.warning("validation of header {} failed", header.idTag)
+ // pow validation failure indicate that either peer is malicious or it is on wrong fork
+ handleRewind(
+ header,
+ peerToBlackList,
+ syncConfig.fastSyncBlockValidationN,
+ syncConfig.criticalBlacklistDuration
+ )
}
}
@@ -711,30 +860,38 @@ class FastSync(
.filter(p => peerRequestsTime.get(p.peer).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now)))
peers
.take(maxConcurrentRequests - assignedHandlers.size)
- .sortBy(_.peerInfo.maxBlockNumber)(Ordering[BigInt].reverse)
+ .sortBy(_.peerInfo.maxBlockNumber)(bigIntReverseOrdering)
.foreach(assignBlockchainWork)
}
}
def assignBlockchainWork(peerWithInfo: PeerWithInfo): Unit = {
val PeerWithInfo(peer, peerInfo) = peerWithInfo
+ log.debug(s"Assigning blockchain work for peer [{}]", peer.id.value)
if (syncState.receiptsQueue.nonEmpty) {
requestReceipts(peer)
} else if (syncState.blockBodiesQueue.nonEmpty) {
requestBlockBodies(peer)
- } else if (
- requestedHeaders.isEmpty &&
- context.child(BlockHeadersHandlerName).isEmpty &&
- syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget &&
- peerInfo.maxBlockNumber >= syncState.pivotBlock.number
- ) {
+ } else if (blockHeadersQueue.nonEmpty) {
requestBlockHeaders(peer)
+ } else if (shouldRequestNewSkeleton(peerInfo)) {
+ requestSkeletonHeaders(peer)
+ } else {
+ log.debug("Nothing to request. Waiting for responses for [{}] sent requests.", assignedHandlers.size)
}
}
+ private def shouldRequestNewSkeleton(peerInfo: PeerInfo): Boolean =
+ currentSkeletonState.isEmpty &&
+ skeletonHandler.isEmpty &&
+ syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget &&
+ peerInfo.maxBlockNumber >= syncState.pivotBlock.number
+
def requestReceipts(peer: Peer): Unit = {
val (receiptsToGet, remainingReceipts) = syncState.receiptsQueue.splitAt(receiptsPerRequest)
+ log.debug("Requesting receipts from peer [{}]", peer.id.value)
+
val handler = context.actorOf(
PeerRequestHandler.props[GetReceipts, Receipts](
peer,
@@ -756,6 +913,8 @@ class FastSync(
def requestBlockBodies(peer: Peer): Unit = {
val (blockBodiesToGet, remainingBlockBodies) = syncState.blockBodiesQueue.splitAt(blockBodiesPerRequest)
+ log.debug("Requesting block bodies from peer [{}]", peer.id.value)
+
val handler = context.actorOf(
PeerRequestHandler.props[GetBlockBodies, BlockBodies](
peer,
@@ -775,11 +934,14 @@ class FastSync(
}
def requestBlockHeaders(peer: Peer): Unit = {
- val limit: BigInt =
- if (blockHeadersPerRequest < (syncState.safeDownloadTarget - syncState.bestBlockHeaderNumber))
- blockHeadersPerRequest
- else
- syncState.safeDownloadTarget - syncState.bestBlockHeaderNumber
+ val (request, remaining) = (blockHeadersQueue.head, blockHeadersQueue.tail)
+
+ log.debug(
+ "Requesting [{}] block headers starting at block header [{}] from peer [{}]",
+ request.limit,
+ request.from,
+ peer.id.value
+ )
val handler = context.actorOf(
PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
@@ -787,15 +949,57 @@ class FastSync(
peerResponseTimeout,
etcPeerManager,
peerEventBus,
- requestMsg = GetBlockHeaders(Left(syncState.bestBlockHeaderNumber + 1), limit, skip = 0, reverse = false),
+ requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false),
responseMsgCode = Codes.BlockHeadersCode
- ),
- BlockHeadersHandlerName
+ )
)
context watch handler
assignedHandlers += (handler -> peer)
- requestedHeaders += (peer -> limit)
+ requestedHeaders += (peer -> request)
+ blockHeadersQueue = remaining
+ peerRequestsTime += (peer -> Instant.now())
+ }
+
+ def requestSkeletonHeaders(peer: Peer): Unit = {
+ val skeleton =
+ HeaderSkeleton(syncState.bestBlockHeaderNumber + 1, syncState.safeDownloadTarget, blockHeadersPerRequest)
+
+ log.debug(
+ "Requesting header skeleton for range [{}-{}] from master peer [{}]",
+ skeleton.from,
+ skeleton.lastSkeletonHeaderNumber,
+ peer.id.value
+ )
+
+ val msg = GetBlockHeaders(
+ Left(skeleton.firstSkeletonHeaderNumber),
+ skeleton.limit,
+ skeleton.gapSize,
+ reverse = false
+ )
+
+ // Stick with the same peer (the best one) as the master peer as long as it is valid.
+ // See: branch resolution, where an invalid master peer is updated.
+ val peerToRequestFrom = masterPeer.getOrElse {
+ masterPeer = Some(peer)
+ peer
+ }
+
+ val handler = context.actorOf(
+ PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
+ peerToRequestFrom,
+ peerResponseTimeout,
+ etcPeerManager,
+ peerEventBus,
+ requestMsg = msg,
+ responseMsgCode = Codes.BlockHeadersCode
+ )
+ )
+
+ context watch handler
+ skeletonHandler = Some(handler)
+ currentSkeletonState = Some(skeleton)
peerRequestsTime += (peer -> Instant.now())
}
@@ -864,9 +1068,7 @@ object FastSync {
private case class UpdatePivotBlock(reason: PivotBlockUpdateReason)
private case object ProcessSyncing
-
- private[sync] case object PersistSyncState
-
+ private case object PersistSyncState
private case object PrintStatus
case class SyncState(
@@ -943,8 +1145,9 @@ object FastSync {
case SyncRestart => true
}
}
-
case object ImportedLastBlock extends PivotBlockUpdateReason
case object LastBlockValidationFailed extends PivotBlockUpdateReason
case object SyncRestart extends PivotBlockUpdateReason
+
+ private final case class HeaderRange(from: BigInt, limit: BigInt)
}
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolver.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolver.scala
new file mode 100644
index 0000000000..c3af0e38c4
--- /dev/null
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolver.scala
@@ -0,0 +1,116 @@
+package io.iohk.ethereum.blockchain.sync.fast
+
+import cats.data.NonEmptyList
+import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
+import io.iohk.ethereum.network.Peer
+import io.iohk.ethereum.utils.Logger
+
+trait FastSyncBranchResolver {
+
+ import FastSyncBranchResolver._
+
+ protected def blockchain: Blockchain
+
+ // TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
+ def discardBlocksAfter(lastValidBlock: BigInt): Unit =
+ discardBlocks(lastValidBlock, blockchain.getBestBlockNumber())
+
+ // TODO [ETCM-676] move to [[Blockchain]] and make sure it's atomic
+ private def discardBlocks(fromBlock: BigInt, toBlock: BigInt): Unit = {
+ val blocksToBeRemoved = childOf(fromBlock).to(toBlock).reverse.toList
+ blocksToBeRemoved.foreach { toBeRemoved =>
+ blockchain
+ .getBlockHeaderByNumber(toBeRemoved)
+ .foreach(header => blockchain.removeBlock(header.hash, withState = false))
+ }
+ }
+
+}
+
+object FastSyncBranchResolver {
+
+ /**
+ * Stores the current search state for binary search.
+ * Meaning we know the first common block lies between minBlockNumber and maxBlockNumber.
+ */
+ final case class SearchState(minBlockNumber: BigInt, maxBlockNumber: BigInt, masterPeer: Peer)
+
+ def parentOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber - 1
+ def childOf(blockHeaderNumber: BigInt): BigInt = blockHeaderNumber + 1
+}
+
+/**
+ * Attempt to find last common block within recent blocks by looking for a parent/child
+ * relationship between our block headers and remote peer's block headers.
+ */
+class RecentBlocksSearch(blockchain: Blockchain) {
+
+ /**
+ * Find the highest common block by trying to find a block so that our block n is the parent of remote candidate block n + 1
+ */
+ def getHighestCommonBlock(
+ candidateHeaders: Seq[BlockHeader],
+ bestBlockNumber: BigInt
+ ): Option[BigInt] = {
+ def isParent(potentialParent: BigInt, childCandidate: BlockHeader): Boolean =
+ blockchain.getBlockHeaderByNumber(potentialParent).exists { _.isParentOf(childCandidate) }
+ NonEmptyList.fromList(candidateHeaders.reverse.toList).flatMap { remoteHeaders =>
+ val blocksToBeCompared = bestBlockNumber.until(bestBlockNumber - remoteHeaders.size).by(-1).toList
+ remoteHeaders.toList
+ .zip(blocksToBeCompared)
+ .collectFirst {
+ case (childCandidate, parent) if isParent(parent, childCandidate) => parent
+ }
+ }
+ }
+
+}
+
+object BinarySearchSupport extends Logger {
+ import FastSyncBranchResolver._
+
+ sealed trait BinarySearchResult
+ final case class BinarySearchCompleted(highestCommonBlockNumber: BigInt) extends BinarySearchResult
+ final case class ContinueBinarySearch(searchState: SearchState) extends BinarySearchResult
+ case object NoCommonBlock extends BinarySearchResult
+
+ /**
+ * Returns the block number in the middle between min and max.
+ * If there is no middle, it will return the lower value.
+ *
+ * E.g. calling this method with min = 3 and max = 6 will return 4
+ */
+ def middleBlockNumber(min: BigInt, max: BigInt): BigInt = (min + max) / 2
+
+ def blockHeaderNumberToRequest(min: BigInt, max: BigInt): BigInt =
+ childOf(middleBlockNumber(min, max))
+
+ def validateBlockHeaders(
+ parentBlockHeader: BlockHeader,
+ childBlockHeader: BlockHeader,
+ searchState: SearchState
+ ): BinarySearchResult = {
+ val childNum = childBlockHeader.number
+ val parentNum = parentBlockHeader.number
+ val min = searchState.minBlockNumber
+ val max = searchState.maxBlockNumber
+
+ log.debug(
+ "Validating block headers (binary search) for parentBlockHeader {}, remote childBlockHeader {} and search state {}",
+ parentBlockHeader.number,
+ childBlockHeader.number,
+ searchState
+ )
+
+ if (parentBlockHeader.isParentOf(childBlockHeader)) { // chains are still aligned but there might be an even better block
+ if (parentNum == max) BinarySearchCompleted(parentNum)
+ else if (parentNum == min && childNum == max) ContinueBinarySearch(searchState.copy(minBlockNumber = childNum))
+ else ContinueBinarySearch(searchState.copy(minBlockNumber = parentNum))
+ } else { // no parent/child -> chains have diverged before parent block
+ if (min == 1 && max <= 2) NoCommonBlock
+ else if (min == max) BinarySearchCompleted(parentOf(parentNum))
+ else ContinueBinarySearch(searchState.copy(maxBlockNumber = parentOf(parentNum).max(1)))
+ }
+ }
+
+}
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActor.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActor.scala
new file mode 100644
index 0000000000..7afa1b797b
--- /dev/null
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActor.scala
@@ -0,0 +1,273 @@
+package io.iohk.ethereum.blockchain.sync.fast
+
+import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props, Scheduler, Terminated}
+import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason
+import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
+import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.{RequestFailed, ResponseReceived}
+import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolverActor._
+import io.iohk.ethereum.blockchain.sync.{Blacklist, PeerListSupportNg, PeerRequestHandler}
+import io.iohk.ethereum.db.storage.AppStateStorage
+import io.iohk.ethereum.domain.{BlockHeader, Blockchain}
+import io.iohk.ethereum.network.Peer
+import io.iohk.ethereum.network.p2p.messages.Codes
+import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
+import io.iohk.ethereum.utils.Config.SyncConfig
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import monix.eval.Coeval
+import monix.catnap.cancelables.AssignableCancelableF.Bool
+import akka.actor.Timers
+import io.iohk.ethereum.blockchain.sync.fast.BinarySearchSupport.ContinueBinarySearch
+
+class FastSyncBranchResolverActor(
+ val fastSync: ActorRef,
+ val peerEventBus: ActorRef,
+ val etcPeerManager: ActorRef,
+ val blockchain: Blockchain,
+ val blacklist: Blacklist,
+ val syncConfig: SyncConfig,
+ val appStateStorage: AppStateStorage,
+ implicit val scheduler: Scheduler
+) extends Actor
+ with ActorLogging
+ with FastSyncBranchResolver
+ with PeerListSupportNg
+ with Timers {
+
+ import FastSyncBranchResolverActor._
+ import FastSyncBranchResolver._
+ import BinarySearchSupport._
+
+ private val recentHeadersSize: Int = syncConfig.blockHeadersPerRequest
+
+ private val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(blockchain)
+
+ override def receive: Receive = waitingForPeerWithHighestBlock
+
+ private def waitingForPeerWithHighestBlock: Receive = handlePeerListMessages orElse { case StartBranchResolver =>
+ getPeerWithHighestBlock match {
+ case Some(PeerWithInfo(peer, _)) => requestRecentBlockHeaders(peer, blockchain.getBestBlockNumber())
+ case None =>
+ log.info("Waiting for peers, rescheduling StartBranchResolver")
+ timers.startSingleTimer(RestartTimerKey, StartBranchResolver, 1.second)
+ }
+ }
+
+ private def waitingForRecentBlockHeaders(
+ masterPeer: Peer,
+ bestBlockNumber: BigInt,
+ requestHandler: ActorRef
+ ): Receive =
+ handlePeerListMessages orElse {
+ case ResponseReceived(peer, BlockHeaders(headers), timeTaken) if peer == masterPeer =>
+ if (headers.size == recentHeadersSize) {
+ log.debug("Received {} block headers from peer {} in {} ms", headers.size, masterPeer.id, timeTaken)
+ handleRecentBlockHeadersResponse(headers, masterPeer, bestBlockNumber)
+ } else {
+ handleInvalidResponse(peer, requestHandler)
+ }
+ case RequestFailed(peer, reason) => handleRequestFailure(peer, sender(), reason)
+ case Terminated(ref) if ref == requestHandler => handlePeerTermination(masterPeer, ref)
+ }
+
+ private def waitingForBinarySearchBlock(
+ searchState: SearchState,
+ blockHeaderNumberToSearch: BigInt,
+ requestHandler: ActorRef
+ ): Receive = {
+ handlePeerListMessages orElse {
+ case ResponseReceived(peer, BlockHeaders(headers), durationMs) if peer == searchState.masterPeer =>
+ context.unwatch(requestHandler)
+ headers match {
+ case childHeader :: Nil if childHeader.number == blockHeaderNumberToSearch =>
+ log.debug(ReceivedBlockHeaderLog, blockHeaderNumberToSearch, peer.id, durationMs)
+ handleBinarySearchBlockHeaderResponse(searchState, childHeader)
+ case _ =>
+ log.warning(ReceivedWrongHeaders, blockHeaderNumberToSearch, headers.map(_.number))
+ handleInvalidResponse(peer, requestHandler)
+ }
+ case RequestFailed(peer, reason) => handleRequestFailure(peer, sender(), reason)
+ case Terminated(ref) if ref == requestHandler => handlePeerTermination(searchState.masterPeer, ref)
+ case Terminated(_) => () // ignore
+ }
+ }
+
+ private def requestRecentBlockHeaders(masterPeer: Peer, bestBlockNumber: BigInt): Unit = {
+ val requestHandler = sendGetBlockHeadersRequest(
+ masterPeer,
+ fromBlock = childOf((bestBlockNumber - recentHeadersSize).max(0)),
+ amount = recentHeadersSize
+ )
+ context.become(waitingForRecentBlockHeaders(masterPeer, bestBlockNumber, requestHandler))
+ }
+
+ /**
+ * Searches recent blocks for a valid parent/child relationship.
+ * If we dont't find one, we switch to binary search.
+ */
+ private def handleRecentBlockHeadersResponse(
+ blockHeaders: Seq[BlockHeader],
+ masterPeer: Peer,
+ bestBlockNumber: BigInt
+ ): Unit = {
+ recentBlocksSearch.getHighestCommonBlock(blockHeaders, bestBlockNumber) match {
+ case Some(highestCommonBlockNumber) =>
+ finalizeBranchResolver(highestCommonBlockNumber, masterPeer)
+ case None =>
+ log.info(SwitchToBinarySearchLog, recentHeadersSize)
+ requestBlockHeaderForBinarySearch(
+ SearchState(minBlockNumber = 1, maxBlockNumber = bestBlockNumber, masterPeer)
+ )
+ }
+ }
+
+ private def requestBlockHeaderForBinarySearch(searchState: SearchState): Unit = {
+ val headerNumberToRequest = blockHeaderNumberToRequest(searchState.minBlockNumber, searchState.maxBlockNumber)
+ val handler = sendGetBlockHeadersRequest(searchState.masterPeer, headerNumberToRequest, 1)
+ context.become(waitingForBinarySearchBlock(searchState, headerNumberToRequest, handler))
+ }
+
+ private def handleBinarySearchBlockHeaderResponse(searchState: SearchState, childHeader: BlockHeader): Unit = {
+ import BinarySearchSupport._
+ blockchain.getBlockHeaderByNumber(parentOf(childHeader.number)) match {
+ case Some(parentHeader) =>
+ validateBlockHeaders(parentHeader, childHeader, searchState) match {
+ case NoCommonBlock => stopWithFailure(BranchResolutionFailed.noCommonBlock)
+ case BinarySearchCompleted(highestCommonBlockNumber) =>
+ finalizeBranchResolver(highestCommonBlockNumber, searchState.masterPeer)
+ case ContinueBinarySearch(newSearchState) =>
+ log.debug(s"Continuing binary search with new search state: $newSearchState")
+ requestBlockHeaderForBinarySearch(newSearchState)
+ }
+ case None => stopWithFailure(BranchResolutionFailed.blockHeaderNotFound(childHeader.number))
+ }
+ }
+
+ private def finalizeBranchResolver(firstCommonBlockNumber: BigInt, masterPeer: Peer): Unit = {
+ discardBlocksAfter(firstCommonBlockNumber)
+ log.info(s"Branch resolution completed with first common block number [$firstCommonBlockNumber]")
+ fastSync ! BranchResolvedSuccessful(highestCommonBlockNumber = firstCommonBlockNumber, masterPeer = masterPeer)
+ context.stop(self)
+ }
+
+ /**
+ * In case of fatal errors (and to prevent trying forever) branch resolver will signal fast-sync about
+ * the error and let fast-sync decide if it issues another request.
+ */
+ private def stopWithFailure(response: BranchResolutionFailed): Unit = {
+ fastSync ! response
+ context.stop(self)
+ }
+
+ private def sendGetBlockHeadersRequest(peer: Peer, fromBlock: BigInt, amount: BigInt): ActorRef = {
+ val handler = context.actorOf(
+ PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
+ peer,
+ syncConfig.peerResponseTimeout,
+ etcPeerManager,
+ peerEventBus,
+ requestMsg = GetBlockHeaders(Left(fromBlock), amount, skip = 0, reverse = false),
+ responseMsgCode = Codes.BlockHeadersCode
+ )
+ )
+ context.watch(handler)
+ handler
+ }
+
+ private def handleInvalidResponse(peer: Peer, peerRef: ActorRef): Unit = {
+ log.warning(s"Received invalid response from peer [${peer.id}]. Restarting branch resolver.")
+ context.unwatch(peerRef)
+ blacklistIfHandshaked(
+ peer.id,
+ syncConfig.blacklistDuration,
+ BlacklistReason.WrongBlockHeaders
+ )
+ restart()
+ }
+
+ private def handleRequestFailure(peer: Peer, peerRef: ActorRef, reason: String): Unit = {
+ log.warning(s"Request to peer [${peer.id}] failed: [$reason]. Restarting branch resolver.")
+ context.unwatch(peerRef)
+ blacklistIfHandshaked(
+ peer.id,
+ syncConfig.blacklistDuration,
+ BlacklistReason.RequestFailed(reason)
+ )
+ restart()
+ }
+
+ private def handlePeerTermination(peer: Peer, peerHandlerRef: ActorRef): Unit = {
+ log.warning(peerTerminatedLog, peerHandlerRef.path.name, peer.id)
+ restart()
+ }
+
+ private def restart(): Unit = {
+ context.become(waitingForPeerWithHighestBlock)
+ self ! StartBranchResolver
+ }
+
+}
+
+object FastSyncBranchResolverActor {
+
+ protected val RestartTimerKey: String = "Restart"
+
+ protected val InvalidHeadersResponseLog: String =
+ "Invalid response - Received {} block headers from peer {}. Requested {} headers. Current master peer {}"
+
+ protected val SwitchToBinarySearchLog: String =
+ "Branch diverged earlier than {} blocks ago. Switching to binary search to determine first common block."
+
+ protected val ReceivedBlockHeaderLog: String =
+ "Received requested block header [{}] from peer [{}] in {} ms"
+
+ protected val ReceivedWrongHeaders: String =
+ "Received invalid response when requesting block header [{}]. Received: {}"
+
+ protected val peerTerminatedLog: String =
+ "Peer request handler [{}] for peer [{}] terminated. Restarting branch resolver."
+
+ def props(
+ fastSync: ActorRef,
+ peerEventBus: ActorRef,
+ etcPeerManager: ActorRef,
+ blockchain: Blockchain,
+ blacklist: Blacklist,
+ syncConfig: SyncConfig,
+ appStateStorage: AppStateStorage,
+ scheduler: Scheduler
+ ): Props =
+ Props(
+ new FastSyncBranchResolverActor(
+ fastSync,
+ peerEventBus,
+ etcPeerManager,
+ blockchain,
+ blacklist,
+ syncConfig,
+ appStateStorage,
+ scheduler
+ )
+ )
+
+ sealed trait BranchResolverRequest
+ case object StartBranchResolver extends BranchResolverRequest
+
+ sealed trait BranchResolverResponse
+ final case class BranchResolvedSuccessful(highestCommonBlockNumber: BigInt, masterPeer: Peer)
+ extends BranchResolverResponse
+ import BranchResolutionFailed._
+ final case class BranchResolutionFailed(failure: BranchResolutionFailure)
+ object BranchResolutionFailed {
+ def noCommonBlock: BranchResolutionFailed = BranchResolutionFailed(NoCommonBlockFound)
+ def blockHeaderNotFound(blockHeaderNum: BigInt): BranchResolutionFailed = BranchResolutionFailed(
+ BlockHeaderNotFound(blockHeaderNum)
+ )
+
+ sealed trait BranchResolutionFailure
+ final case object NoCommonBlockFound extends BranchResolutionFailure
+ final case class BlockHeaderNotFound(blockHeaderNum: BigInt) extends BranchResolutionFailure
+ }
+
+}
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/HeaderSkeleton.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/HeaderSkeleton.scala
new file mode 100644
index 0000000000..eee8961577
--- /dev/null
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/HeaderSkeleton.scala
@@ -0,0 +1,208 @@
+package io.iohk.ethereum.blockchain.sync.fast
+
+import io.iohk.ethereum.blockchain.sync.fast.HeaderSkeleton._
+import io.iohk.ethereum.domain.BlockHeader
+
+/**
+ * This class contains the state of the current skeleton being downloaded. This state is represented as the downloaded
+ * skeleton headers plus the downloaded batches.
+ * A skeleton of block headers consists of `limit` headers, separated by `gapSize` blocks in between.
+ * A batch of blocks is a sequence of `gapSize + 1` block headers starting one block after the previous skeleton
+ * header up to the next skeleton header inclusive.
+ * When a batch of headers is downloaded, it is checked against the current skeleton and if it is correct, we save it
+ * into the state.
+ * When all batches filling the gaps are downloaded, this skeleton is considered full and the `fullChain` can be
+ * requested.
+ *
+ * Example:
+ * Given from = 0, to = 10, maxSkeletonHeaders = 3
+ * Then:
+ * - firstSkeletonHeaderNumber = 2
+ * - gapSize = 2
+ * - batchSize = 3
+ * - skeletonHeaderNumbers = Seq(2, 5, 8)
+ * - batchStartingHeaderNumbers = Seq(0, 3, 6)
+ *
+ * batch gap batch
+ * /-------------------\ /-----------\ /------------------\
+ * 0 1 2 3 4 5 6 7 8 9 10
+ * | | | | |
+ * from 1stSkeletonHeader 2ndSkeletonHeader lastSkeletonHeader to
+ *
+ * @param from Lower bound for this skeleton, inclusive
+ * @param to Upper bound for this skeleton, inclusive
+ * @param maxSkeletonHeaders Maximum number of skeleton headers
+ * @param skeletonHeaders The currently downloaded skeleton headers. May be empty if none were downloaded. This is set
+ * by using `setSkeletonHeaders`
+ * @param batches The currently downloaded batches. This is filled in by using `addBatch`
+ */
+final case class HeaderSkeleton(
+ from: BigInt,
+ to: BigInt,
+ maxSkeletonHeaders: Int,
+ private val skeletonHeaders: Seq[BlockHeader] = Seq.empty,
+ private val batches: Map[BigInt, Seq[BlockHeader]] = Map.empty
+) {
+
+ private val remainingBlocks: BigInt = to - from + 1
+
+ /**
+ * Number of batched headers to request to a peer
+ */
+ val batchSize: BigInt = remainingBlocks.min(maxSkeletonHeaders)
+
+ /**
+ * Number of blocks in between each skeleton header
+ */
+ val gapSize: BigInt = batchSize - 1
+
+ /**
+ * Not to be confused with `from`. This is the number of the first header in the skeleton.
+ */
+ val firstSkeletonHeaderNumber: BigInt = from + gapSize
+
+ /**
+ * Maximum number of blocks to be downloaded at once. This is the total number of blocks that the skeleton contains.
+ */
+ val limit: BigInt = {
+ val remainingSkeletonHeaders = remainingBlocks / batchSize + (remainingBlocks % batchSize).min(1)
+ remainingSkeletonHeaders.min(maxSkeletonHeaders)
+ }
+
+ val lastSkeletonHeaderNumber: BigInt = from + (batchSize * limit) - 1
+ private val skeletonHeaderNumbers: Seq[BigInt] =
+ firstSkeletonHeaderNumber to lastSkeletonHeaderNumber by batchSize
+
+ def validateAndSetSkeletonHeaders(headers: Seq[BlockHeader]): Either[HeaderSkeletonError, HeaderSkeleton] =
+ for {
+ _ <- checkSkeletonHeadersTotal(headers)
+ _ <- checkSkeletonHeaderNumbers(headers)
+ } yield copy(skeletonHeaders = headers)
+
+ /**
+ * Use this method to update this state with the downloaded skeleton
+ * @param headers The downloaded skeleton
+ * @return Either the updated structure if the validation succeeded or an error
+ */
+ def setSkeletonHeaders(headers: Seq[BlockHeader]): Either[HeaderSkeletonError, HeaderSkeleton] =
+ for {
+ _ <- checkSkeletonHeadersTotal(headers)
+ _ <- checkSkeletonHeaderNumbers(headers)
+ } yield copy(skeletonHeaders = headers)
+
+ private def checkSkeletonHeadersTotal(headers: Seq[BlockHeader]): Either[HeaderSkeletonError, Unit] =
+ Either.cond(headers.size == limit, (), InvalidTotalHeaders(headers.size, limit.toInt))
+
+ private def checkSkeletonHeaderNumbers(headers: Seq[BlockHeader]): Either[HeaderSkeletonError, Unit] = {
+ val downloadedHeaderNumbers = headers.map(_.number)
+ val isValid = downloadedHeaderNumbers.zip(skeletonHeaderNumbers).forall {
+ case (downloadedHeaderNumber, skeletonNumber) => downloadedHeaderNumber == skeletonNumber
+ }
+ Either.cond(isValid, (), InvalidHeaderNumber(downloadedHeaderNumbers, skeletonHeaderNumbers))
+ }
+
+ /**
+ * An ordered sequence with the numbers of the first block of each batch
+ */
+ val batchStartingHeaderNumbers: Seq[BigInt] = from +: skeletonHeaderNumbers.dropRight(1).map(_ + 1)
+
+ /**
+ * Use this method to update this state with a downloaded batch of headers
+ * @param batchHeaders The downloaded batch of headers
+ * @return Either the updated structure if the validation succeeded or an error
+ */
+ def addBatch(batchHeaders: Seq[BlockHeader]): Either[HeaderSkeletonError, HeaderSkeleton] =
+ for {
+ skeletonHeader <- findSkeletonHeader(batchHeaders)
+ _ <- checkSkeletonParentHash(batchHeaders, skeletonHeader)
+ batchStartingNumber <- findBatchStartingNumber(batchHeaders)
+ } yield copy(batches = batches + (batchStartingNumber -> batchHeaders))
+
+ private def findSkeletonHeader(batchHeaders: Seq[BlockHeader]): Either[HeaderBatchError, BlockHeader] = {
+ batchHeaders.lastOption match {
+ case Some(header) =>
+ for {
+ byNumber <- findSkeletonHeaderByNumber(header)
+ byHash <- Either.cond(byNumber.hash == header.hash, byNumber, InvalidBatchHash(header, byNumber))
+ } yield byHash
+ case None =>
+ Left(EmptyDownloadedBatch(skeletonHeaderNumbers))
+ }
+ }
+
+ private def findSkeletonHeaderByNumber(header: BlockHeader): Either[InvalidBatchLastNumber, BlockHeader] = {
+ skeletonHeaders
+ .find(_.number == header.number)
+ .toRight(InvalidBatchLastNumber(header.number, skeletonHeaderNumbers))
+ }
+
+ private def checkSkeletonParentHash(
+ batchHeaders: Seq[BlockHeader],
+ skeletonHeader: BlockHeader
+ ): Either[HeaderBatchError, Unit] = {
+ batchHeaders.dropRight(1).lastOption match {
+ case Some(penultimateBatchHeader) if penultimateBatchHeader.hash != skeletonHeader.parentHash =>
+ Left(InvalidPenultimateHeader(penultimateBatchHeader, skeletonHeader))
+ case _ =>
+ Right(())
+ }
+ }
+
+ private def findBatchStartingNumber(batchHeaders: Seq[BlockHeader]): Either[HeaderBatchError, BigInt] = {
+ batchHeaders.headOption.map(_.number) match {
+ case Some(firstBatchHeader) =>
+ val found = batchStartingHeaderNumbers.find(_ == firstBatchHeader)
+ found.toRight(InvalidBatchFirstNumber(firstBatchHeader, batchStartingHeaderNumbers))
+ case None =>
+ Left(EmptyDownloadedBatch(skeletonHeaderNumbers))
+ }
+ }
+
+ private val isFull: Boolean = batchStartingHeaderNumbers.forall(batches.contains)
+
+ /**
+ * The complete skeleton plus the filled in batches, or `None` if not everything was downloaded
+ */
+ val fullChain: Option[Seq[BlockHeader]] =
+ if (isFull) Some(batchStartingHeaderNumbers.flatMap(batches.apply))
+ else None
+}
+
+object HeaderSkeleton {
+
+ sealed trait HeaderSkeletonError {
+ def msg: String
+ }
+ final case class NoCurrentHeaderSkeleton(downloaded: Int, expected: Int) extends HeaderSkeletonError {
+ override def msg: String = s"Invalid downloaded total headers. Expected $expected but was $downloaded"
+ }
+ final case class InvalidTotalHeaders(downloaded: Int, expected: Int) extends HeaderSkeletonError {
+ override def msg: String = s"Invalid downloaded total headers. Expected $expected but was $downloaded"
+ }
+ final case class InvalidHeaderNumber(downloaded: Seq[BigInt], expected: Seq[BigInt]) extends HeaderSkeletonError {
+ override def msg: String =
+ s"Invalid sequence of skeleton headers. Expected [${expected.mkString(",")}] but was [${downloaded.mkString(",")}]"
+ }
+
+ sealed trait HeaderBatchError extends HeaderSkeletonError
+ final case class InvalidBatchLastNumber(downloaded: BigInt, expected: Seq[BigInt]) extends HeaderBatchError {
+ override def msg: String = s"Invalid batch last number. $downloaded wasn't found in [${expected.mkString(",")}]"
+ }
+ final case class InvalidBatchHash(downloaded: BlockHeader, expected: BlockHeader) extends HeaderBatchError {
+ override def msg: String = s"Invalid batch last block hash. Expected $expected but was $downloaded"
+ }
+ final case class EmptyDownloadedBatch(expected: Seq[BigInt]) extends HeaderBatchError {
+ override def msg: String = s"Downloaded empty headers batch. Expected [${expected.mkString(",")}]"
+ }
+ final case class InvalidPenultimateHeader(penultimateBatchHeader: BlockHeader, skeletonHeader: BlockHeader)
+ extends HeaderBatchError {
+ override def msg: String =
+ s"Invalid batch penultimate header. $penultimateBatchHeader isn't parent of $skeletonHeader"
+ }
+ final case class InvalidBatchFirstNumber(downloaded: BigInt, expected: Seq[BigInt]) extends HeaderBatchError {
+ override def msg: String = s"Invalid batch first number. $downloaded wasn't found in [${expected.mkString(",")}]"
+ }
+ final case class InvalidDownloadedChain(downloaded: Seq[BlockHeader]) extends HeaderBatchError {
+ override def msg: String = s"Invalid downloaded batch: ${downloaded.map(h => h.number -> h.hash).mkString(", ")}"
+ }
+}
diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncBlocksValidator.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncBlocksValidator.scala
index f65780cd2e..41aadb2fb8 100644
--- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncBlocksValidator.scala
+++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/SyncBlocksValidator.scala
@@ -1,7 +1,7 @@
package io.iohk.ethereum.blockchain.sync.fast
import akka.util.ByteString
-import io.iohk.ethereum.consensus.validators.Validators
+import io.iohk.ethereum.consensus.validators.{BlockHeaderError, BlockHeaderValid, Validators}
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator
import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockValid
import io.iohk.ethereum.domain.{BlockBody, BlockHeader, Blockchain}
@@ -31,6 +31,9 @@ trait SyncBlocksValidator {
result
}
+ def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ validators.blockHeaderValidator.validateHeaderOnly(blockHeader)
+
def checkHeadersChain(headers: Seq[BlockHeader]): Boolean =
if (headers.length > 1) headers.zip(headers.tail).forall { case (parent, child) =>
parent.hash == child.parentHash && parent.number + 1 == child.number
diff --git a/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/EthashBlockHeaderValidator.scala b/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/EthashBlockHeaderValidator.scala
index c7793e8388..ab6e2e205f 100644
--- a/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/EthashBlockHeaderValidator.scala
+++ b/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/EthashBlockHeaderValidator.scala
@@ -23,10 +23,7 @@ class EthashBlockHeaderValidator(blockchainConfig: BlockchainConfig)
protected def difficulty: DifficultyCalculator = DifficultyCalculator(blockchainConfig)
- def validateEvenMore(
- blockHeader: BlockHeader,
- parentHeader: BlockHeader
- ): Either[BlockHeaderError, BlockHeaderValid] =
+ def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
validatePoW(blockHeader)
/**
diff --git a/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/MockedPowBlockHeaderValidator.scala b/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/MockedPowBlockHeaderValidator.scala
index a88daacaee..d2f5a59d77 100644
--- a/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/MockedPowBlockHeaderValidator.scala
+++ b/src/main/scala/io/iohk/ethereum/consensus/ethash/validators/MockedPowBlockHeaderValidator.scala
@@ -11,10 +11,7 @@ class MockedPowBlockHeaderValidator(blockchainConfig: BlockchainConfig)
protected def difficulty: DifficultyCalculator = DifficultyCalculator(blockchainConfig)
- def validateEvenMore(
- blockHeader: BlockHeader,
- parentHeader: BlockHeader
- ): Either[BlockHeaderError, BlockHeaderValid] =
+ override def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
Right(BlockHeaderValid)
}
diff --git a/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidator.scala b/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidator.scala
index 1c8c39d58f..668c876112 100644
--- a/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidator.scala
+++ b/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidator.scala
@@ -13,6 +13,8 @@ trait BlockHeaderValidator {
blockHeader: BlockHeader,
getBlockHeaderByHash: GetBlockHeaderByHash
): Either[BlockHeaderError, BlockHeaderValid]
+
+ def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid]
}
object BlockHeaderValidator {
diff --git a/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSkeleton.scala b/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSkeleton.scala
index c11d6b02d0..d77602f251 100644
--- a/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSkeleton.scala
+++ b/src/main/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSkeleton.scala
@@ -33,10 +33,7 @@ abstract class BlockHeaderValidatorSkeleton(blockchainConfig: BlockchainConfig)
* A hook where even more consensus-specific validation can take place.
* For example, PoW validation is done here.
*/
- protected def validateEvenMore(
- blockHeader: BlockHeader,
- parentHeader: BlockHeader
- ): Either[BlockHeaderError, BlockHeaderValid]
+ protected def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid]
/** This method allows validate a BlockHeader (stated on
* section 4.4.4 of http://paper.gavwood.com/).
@@ -55,7 +52,7 @@ abstract class BlockHeaderValidatorSkeleton(blockchainConfig: BlockchainConfig)
* @param blockHeader BlockHeader to validate.
* @param getBlockHeaderByHash function to obtain the parent.
*/
- def validate(
+ override def validate(
blockHeader: BlockHeader,
getBlockHeaderByHash: GetBlockHeaderByHash
): Either[BlockHeaderError, BlockHeaderValid] = {
@@ -87,7 +84,7 @@ abstract class BlockHeaderValidatorSkeleton(blockchainConfig: BlockchainConfig)
_ <- validateGasLimit(blockHeader, parentHeader)
_ <- validateNumber(blockHeader, parentHeader)
_ <- validateExtraFields(blockHeader)
- _ <- validateEvenMore(blockHeader, parentHeader)
+ _ <- validateEvenMore(blockHeader)
} yield BlockHeaderValid
}
@@ -242,4 +239,12 @@ abstract class BlockHeaderValidatorSkeleton(blockchainConfig: BlockchainConfig)
}
}
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] = {
+ for {
+ _ <- validateExtraData(blockHeader)
+ _ <- validateGasUsed(blockHeader)
+ _ <- validateExtraFields(blockHeader)
+ _ <- validateEvenMore(blockHeader)
+ } yield BlockHeaderValid
+ }
}
diff --git a/src/main/scala/io/iohk/ethereum/domain/Transaction.scala b/src/main/scala/io/iohk/ethereum/domain/Transaction.scala
index 9669c9d21b..d3b7f70baf 100644
--- a/src/main/scala/io/iohk/ethereum/domain/Transaction.scala
+++ b/src/main/scala/io/iohk/ethereum/domain/Transaction.scala
@@ -34,7 +34,7 @@ case class Transaction(
override def toString: String = {
val receivingAddressString =
- if (receivingAddress.isDefined) Hex.toHexString(receivingAddress.get.toArray) else "[Contract creation]"
+ receivingAddress.map(addr => Hex.toHexString(addr.toArray)).getOrElse("[Contract creation]")
s"Transaction {" +
s"nonce: $nonce " +
diff --git a/src/main/scala/io/iohk/ethereum/testmode/TestmodeConsensus.scala b/src/main/scala/io/iohk/ethereum/testmode/TestmodeConsensus.scala
index af1369e4c8..e051a24349 100644
--- a/src/main/scala/io/iohk/ethereum/testmode/TestmodeConsensus.scala
+++ b/src/main/scala/io/iohk/ethereum/testmode/TestmodeConsensus.scala
@@ -30,7 +30,15 @@ class TestmodeConsensus(
override def config: FullConsensusConfig[AnyRef] = FullConsensusConfig[AnyRef](consensusConfig, "")
class TestValidators extends Validators {
- override def blockHeaderValidator: BlockHeaderValidator = (_, _) => Right(BlockHeaderValid)
+ override def blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = Right(BlockHeaderValid)
+
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Right(BlockHeaderValid)
+ }
override def signedTransactionValidator: SignedTransactionValidator =
new StdSignedTransactionValidator(blockchainConfig)
override def validateBlockBeforeExecution(
diff --git a/src/main/scala/io/iohk/ethereum/utils/Config.scala b/src/main/scala/io/iohk/ethereum/utils/Config.scala
index ba07cb8a06..857c7a0c5f 100644
--- a/src/main/scala/io/iohk/ethereum/utils/Config.scala
+++ b/src/main/scala/io/iohk/ethereum/utils/Config.scala
@@ -134,7 +134,8 @@ object Config {
stateSyncBloomFilterSize: Int,
stateSyncPersistBatchSize: Int,
pivotBlockReScheduleInterval: FiniteDuration,
- maxPivotBlockAge: Int
+ maxPivotBlockAge: Int,
+ fastSyncMaxBatchRetries: Int
)
object SyncConfig {
@@ -177,7 +178,8 @@ object Config {
stateSyncBloomFilterSize = syncConfig.getInt("state-sync-bloom-filter-size"),
stateSyncPersistBatchSize = syncConfig.getInt("state-sync-persist-batch-size"),
pivotBlockReScheduleInterval = syncConfig.getDuration("pivot-block-reschedule-interval").toMillis.millis,
- maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age")
+ maxPivotBlockAge = syncConfig.getInt("max-pivot-block-age"),
+ fastSyncMaxBatchRetries = syncConfig.getInt("fast-sync-max-batch-retries")
)
}
}
diff --git a/src/main/scala/io/iohk/ethereum/utils/Logger.scala b/src/main/scala/io/iohk/ethereum/utils/Logger.scala
index 3c77c941b9..fc8f9038a2 100644
--- a/src/main/scala/io/iohk/ethereum/utils/Logger.scala
+++ b/src/main/scala/io/iohk/ethereum/utils/Logger.scala
@@ -4,11 +4,11 @@ import com.typesafe.scalalogging
import org.slf4j.{LoggerFactory, MDC}
trait Logger {
- val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
+ protected val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
}
trait LazyLogger {
- lazy val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
+ protected lazy val log: scalalogging.Logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(getClass))
}
trait LoggingContext {
diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf
index 40fd938757..b2e4307f69 100644
--- a/src/test/resources/application.conf
+++ b/src/test/resources/application.conf
@@ -183,3 +183,13 @@ faucet {
# some super slow tests should be skipped on CI (eg. mining)
skip-super-slow-tests = false
skip-super-slow-tests = ${?CI}
+
+akka {
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
+ # Not using ${logging.logs-level} because it might be set to TRACE, which our version of Akka doesn't have.
+ loglevel = "DEBUG"
+ logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+ logger-startup-timeout = 30s
+ log-dead-letters = off
+
+}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index ce749b4f1f..f22f8ff4cc 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -8,6 +8,7 @@
+
diff --git a/src/test/scala/io/iohk/ethereum/Mocks.scala b/src/test/scala/io/iohk/ethereum/Mocks.scala
index 5f9d12f732..c2c577e4ca 100644
--- a/src/test/scala/io/iohk/ethereum/Mocks.scala
+++ b/src/test/scala/io/iohk/ethereum/Mocks.scala
@@ -67,8 +67,16 @@ object Mocks {
override def validateHeaderAndBody(blockHeader: BlockHeader, blockBody: BlockBody) = Right(BlockValid)
}
- override val blockHeaderValidator: BlockHeaderValidator = (_: BlockHeader, _: GetBlockHeaderByHash) =>
- Right(BlockHeaderValid)
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = Right(BlockHeaderValid)
+
+ override def validateHeaderOnly(
+ blockHeader: BlockHeader
+ ): Either[BlockHeaderError, BlockHeaderValid] = Right(BlockHeaderValid)
+ }
override val ommersValidator: OmmersValidator =
(_: ByteString, _: BigInt, _: Seq[BlockHeader], _: GetBlockHeaderByHash, _: GetNBlocksBack) => Right(OmmersValid)
@@ -84,8 +92,14 @@ object Mocks {
(_: SignedTransaction, _: Account, _: BlockHeader, _: UInt256, _: BigInt) =>
Left(SignedTransactionError.TransactionSignatureError)
- override val blockHeaderValidator: BlockHeaderValidator = (_: BlockHeader, _: GetBlockHeaderByHash) =>
- Left(HeaderNumberError)
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = Left(HeaderNumberError)
+
+ override def validateHeaderOnly(blockHeader: BlockHeader) = Left(HeaderNumberError)
+ }
override val ommersValidator: OmmersValidator =
(_: ByteString, _: BigInt, _: Seq[BlockHeader], _: GetBlockHeaderByHash, _: GetNBlocksBack) =>
diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/EtcPeerManagerFake.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/EtcPeerManagerFake.scala
index 7a610eace6..edbfe6ec6e 100644
--- a/src/test/scala/io/iohk/ethereum/blockchain/sync/EtcPeerManagerFake.scala
+++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/EtcPeerManagerFake.scala
@@ -105,13 +105,13 @@ object EtcPeerManagerFake {
case sendMsg @ EtcPeerManagerActor.SendMessage(rawMsg, peerId) =>
requests.onNext(sendMsg)
val response = rawMsg.underlyingMsg match {
- case GetBlockHeaders(startingBlock, maxHeaders, _, false) =>
- val blockMatchesStart: Block => Boolean = block =>
- startingBlock.fold(nr => block.number == nr, hash => block.hash == hash)
+ case GetBlockHeaders(startingBlock, maxHeaders, skip, false) =>
val headers = blocks.tails
- .find(_.headOption.exists(blockMatchesStart))
+ .find(_.headOption.exists(blockMatchesStart(_, startingBlock)))
.toList
.flatten
+ .zipWithIndex
+ .collect { case (block, index) if index % (skip + 1) == 0 => block }
.take(maxHeaders.toInt)
.map(_.header)
BlockHeaders(headers)
@@ -132,5 +132,8 @@ object EtcPeerManagerFake {
}
this
}
+
+ def blockMatchesStart(block: Block, startingBlock: Either[BigInt, ByteString]): Boolean =
+ startingBlock.fold(nr => block.number == nr, hash => block.hash == hash)
}
}
diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala
index 746377dd51..9d399d5625 100644
--- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala
@@ -6,10 +6,10 @@ import akka.testkit.{ExplicitlyTriggeredScheduler, TestActorRef, TestProbe}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import io.iohk.ethereum.blockchain.sync.fast.FastSync.SyncState
-import io.iohk.ethereum.consensus.TestConsensus
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
-import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderPoWError
-import io.iohk.ethereum.consensus.validators.{BlockHeaderValid, BlockHeaderValidator, Validators}
+import io.iohk.ethereum.consensus.validators.BlockHeaderError.{HeaderParentNotFoundError, HeaderPoWError}
+import io.iohk.ethereum.consensus.validators.{BlockHeaderError, BlockHeaderValid, BlockHeaderValidator, Validators}
+import io.iohk.ethereum.consensus.{GetBlockHeaderByHash, TestConsensus}
import io.iohk.ethereum.domain._
import io.iohk.ethereum.ledger.Ledger
import io.iohk.ethereum.ledger.Ledger.VMImpl
@@ -118,9 +118,17 @@ class SyncControllerSpec
}
it should "handle blocks that fail validation" in withTestSetup(
- new Mocks.MockValidatorsAlwaysSucceed {
- override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, getBlockHeaderByHash) =>
- Left(HeaderPoWError)
+ validators = new Mocks.MockValidatorsAlwaysSucceed {
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = {
+ Left(HeaderPoWError)
+ }
+
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Left(HeaderPoWError)
}
}
) { testSetup =>
@@ -146,14 +154,37 @@ class SyncControllerSpec
eventually {
someTimePasses()
val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get
- syncState.bestBlockHeaderNumber shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN)
- syncState.nextBlockToFullyValidate shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN + 1)
+
+ // Header validation failed when downloading skeleton headers.
+ // Sync state remains the same and the peer is blacklisted.
+ syncState.pivotBlock shouldBe defaultPivotBlockHeader
+ syncState.bestBlockHeaderNumber shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber)
+ syncState.nextBlockToFullyValidate shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 1)
syncState.blockBodiesQueue.isEmpty shouldBe true
syncState.receiptsQueue.isEmpty shouldBe true
}
}
- it should "rewind fast-sync state if received header have no known parent" in withTestSetup() { testSetup =>
+ it should "rewind fast-sync state if received header have no known parent" in withTestSetup(
+ validators = new Mocks.MockValidatorsAlwaysSucceed {
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ val invalidBlockNNumber = 399510
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = {
+ if (blockHeader.number == invalidBlockNNumber) {
+ Left(HeaderParentNotFoundError)
+ } else {
+ Right(BlockHeaderValid)
+ }
+ }
+
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Right(BlockHeaderValid)
+ }
+ }
+ ) { testSetup =>
import testSetup._
startWithState(defaultStateBeforeNodeRestart)
@@ -161,14 +192,10 @@ class SyncControllerSpec
val handshakedPeers = HandshakedPeers(singlePeer)
- val newBlocks = Seq(
- defaultPivotBlockHeader.copy(
- number = defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 1,
- parentHash = ByteString(1, 2, 3)
- )
- )
+ val blockHeaders =
+ getHeaders(defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 1, 10)
- setupAutoPilot(etcPeerManager, handshakedPeers, defaultPivotBlockHeader, BlockchainData(newBlocks))
+ setupAutoPilot(etcPeerManager, handshakedPeers, defaultPivotBlockHeader, BlockchainData(blockHeaders))
val watcher = TestProbe()
watcher.watch(syncController)
@@ -176,8 +203,12 @@ class SyncControllerSpec
eventually {
someTimePasses()
val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get
- syncState.bestBlockHeaderNumber shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN)
- syncState.nextBlockToFullyValidate shouldBe (defaultStateBeforeNodeRestart.bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN + 1)
+ val invalidBlockNumber = defaultStateBeforeNodeRestart.bestBlockHeaderNumber + 9
+
+ // Header validation failed at header number 399510
+ // Rewind sync state by configured number of headers.
+ syncState.bestBlockHeaderNumber shouldBe (invalidBlockNumber - syncConfig.fastSyncBlockValidationN)
+ syncState.nextBlockToFullyValidate shouldBe (invalidBlockNumber - syncConfig.fastSyncBlockValidationN + 1)
syncState.blockBodiesQueue.isEmpty shouldBe true
syncState.receiptsQueue.isEmpty shouldBe true
}
@@ -190,7 +221,7 @@ class SyncControllerSpec
syncController ! SyncProtocol.Start
- val handshakedPeers = HandshakedPeers(singlePeer)
+ val handshakedPeers = HandshakedPeers(twoAcceptedPeers)
val watcher = TestProbe()
watcher.watch(syncController)
@@ -202,9 +233,9 @@ class SyncControllerSpec
// Send block that is way forward, we should ignore that block and blacklist that peer
val futureHeaders = Seq(defaultPivotBlockHeader.copy(number = defaultPivotBlockHeader.number + 20))
- val futureHeadersMessage = PeerRequestHandler.ResponseReceived(peer1, BlockHeaders(futureHeaders), 2L)
+ val futureHeadersMessage = PeerRequestHandler.ResponseReceived(peer2, BlockHeaders(futureHeaders), 2L)
implicit val ec = system.dispatcher
- system.scheduler.scheduleAtFixedRate(0.seconds, 0.1.second, fast, futureHeadersMessage)
+ system.scheduler.scheduleAtFixedRate(0.seconds, 0.5.seconds, fast, futureHeadersMessage)
eventually {
someTimePasses()
@@ -219,14 +250,20 @@ class SyncControllerSpec
}
it should "update pivot block if pivot fail" in withTestSetup(new Mocks.MockValidatorsAlwaysSucceed {
- override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, _) =>
- {
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = {
if (blockHeader.number != 399500 + 10) {
Right(BlockHeaderValid)
} else {
- Left(HeaderPoWError)
+ Left(HeaderParentNotFoundError)
}
}
+
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Right(BlockHeaderValid)
}
}) { testSetup =>
import testSetup._
@@ -555,16 +592,13 @@ class SyncControllerSpec
case SendMessage(msg: GetBlockHeadersEnc, peer) =>
val underlyingMessage = msg.underlyingMsg
- if (underlyingMessage.maxHeaders == 1) {
+ val requestedBlockNumber = underlyingMessage.block.swap.toOption.get
+ if (requestedBlockNumber == pivotHeader.number) {
// pivot block
sender ! MessageFromPeer(BlockHeaders(Seq(pivotHeader)), peer)
} else {
- if (!onlyPivot) {
- val start = msg.underlyingMsg.block.swap.toOption.get
- val stop = start + msg.underlyingMsg.maxHeaders
- val headers = (start until stop).flatMap(i => blockchainData.headers.get(i))
- sender ! MessageFromPeer(BlockHeaders(headers), peer)
- }
+ val headers = generateBlockHeaders(underlyingMessage, blockchainData)
+ sender ! MessageFromPeer(BlockHeaders(headers), peer)
}
this
@@ -630,6 +664,19 @@ class SyncControllerSpec
}
}
+ private def generateBlockHeaders(
+ underlyingMessage: GetBlockHeaders,
+ blockchainData: BlockchainData
+ ): Seq[BlockHeader] = {
+ val start = underlyingMessage.block.swap.toOption.get
+ val stop = start + underlyingMessage.maxHeaders * (underlyingMessage.skip + 1)
+
+ (start until stop)
+ .flatMap(i => blockchainData.headers.get(i))
+ .zipWithIndex
+ .collect { case (header, index) if index % (underlyingMessage.skip + 1) == 0 => header }
+ }
+
// scalastyle:off method.length parameter.number
def setupAutoPilot(
testProbe: TestProbe,
diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala
index 25d2b8d477..08db15a76c 100644
--- a/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala
+++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/TestSyncConfig.scala
@@ -43,7 +43,8 @@ trait TestSyncConfig extends SyncConfigBuilder {
stateSyncBloomFilterSize = 1000,
stateSyncPersistBatchSize = 1000,
pivotBlockReScheduleInterval = 1.second,
- maxPivotBlockAge = 96
+ maxPivotBlockAge = 96,
+ fastSyncMaxBatchRetries = 3
)
override lazy val syncConfig: SyncConfig = defaultSyncConfig
diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActorSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActorSpec.scala
new file mode 100644
index 0000000000..5503bc2494
--- /dev/null
+++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverActorSpec.scala
@@ -0,0 +1,362 @@
+package io.iohk.ethereum.blockchain.sync.fast
+
+import java.net.InetSocketAddress
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.gracefulStop
+import akka.testkit.TestActor.AutoPilot
+import akka.testkit.{TestKit, TestProbe}
+import akka.util.{ByteString, Timeout}
+import cats.effect.concurrent.Deferred
+import cats.implicits._
+import io.iohk.ethereum.blockchain.sync.PeerListSupport.PeersMap
+import io.iohk.ethereum.blockchain.sync._
+import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolverActor.{BranchResolvedSuccessful, StartBranchResolver}
+import io.iohk.ethereum.domain.{Block, BlockHeader, ChainWeight}
+import io.iohk.ethereum.network.EtcPeerManagerActor._
+import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
+import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
+import io.iohk.ethereum.network.p2p.messages.ProtocolVersions
+import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
+import io.iohk.ethereum.utils.Logger
+import io.iohk.ethereum.{BlockHelpers, NormalPatience, WithActorSystemShutDown}
+import monix.eval.Task
+import monix.execution.Scheduler
+import monix.reactive.Observable
+import monix.reactive.subjects.{ReplaySubject, Subject}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.freespec.AnyFreeSpecLike
+
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+import scala.util.Random
+import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolverActor.BranchResolutionFailed
+import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolverActor.BranchResolutionFailed.NoCommonBlockFound
+
+class FastSyncBranchResolverActorSpec
+ extends TestKit(ActorSystem("FastSyncBranchResolver_testing"))
+ with AnyFreeSpecLike
+ with ScalaFutures
+ with NormalPatience
+ with WithActorSystemShutDown { self =>
+ implicit val timeout: Timeout = Timeout(30.seconds)
+
+ import FastSyncBranchResolverActorSpec._
+
+ "FastSyncBranchResolver" - {
+ "fetch headers from the new master peer" - {
+ "the chain is repaired from the first request to the new master pair and then the last two blocks are removed" in new TestSetup {
+ override implicit lazy val system = self.system
+ override implicit val scheduler = Scheduler(system.dispatcher)
+
+ val sender = TestProbe("sender")
+
+ val commonBlocks: List[Block] = BlockHelpers.generateChain(
+ 5,
+ BlockHelpers.genesis,
+ block => block
+ )
+
+ val blocksSaved: List[Block] = commonBlocks :++ BlockHelpers.generateChain(
+ 1,
+ commonBlocks.last,
+ block => block
+ )
+
+ val blocksSavedInPeer: List[Block] = commonBlocks :++ BlockHelpers.generateChain(
+ 2,
+ commonBlocks.last,
+ block => block
+ )
+
+ val firstBatchBlockHeaders: List[Block] =
+ blocksSavedInPeer.slice(blocksSavedInPeer.size - syncConfig.blockHeadersPerRequest, blocksSavedInPeer.size)
+
+ val blocksSentFromPeer: Map[Int, List[Block]] = Map(1 -> firstBatchBlockHeaders)
+
+ saveBlocks(blocksSaved)
+ val etcPeerManager = createEtcPeerManager(handshakedPeers, blocksSentFromPeer)
+ val fastSyncBranchResolver =
+ creatFastSyncBranchResolver(sender.ref, etcPeerManager, CacheBasedBlacklist.empty(BlacklistMaxElements))
+
+ val expectation: PartialFunction[Any, BranchResolvedSuccessful] = {
+ case r @ BranchResolvedSuccessful(num, _) if num == BigInt(5) => r
+ }
+
+ val response = (for {
+ _ <- Task(sender.send(fastSyncBranchResolver, StartBranchResolver))
+ response <- Task(sender.expectMsgPF()(expectation))
+ _ <- Task(stopController(fastSyncBranchResolver))
+ } yield response).runSyncUnsafe()
+ assert(getBestPeers.contains(response.masterPeer))
+ }
+
+ "The chain is repaired doing binary searching with the new master peer and then remove the last invalid blocks" - {
+ "highest common block is in the middle" in new TestSetup {
+ override implicit lazy val system = self.system
+ override implicit val scheduler = Scheduler(system.dispatcher)
+
+ val sender = TestProbe("sender")
+
+ val commonBlocks: List[Block] = BlockHelpers.generateChain(5, BlockHelpers.genesis)
+ val blocksSaved: List[Block] = commonBlocks :++ BlockHelpers.generateChain(5, commonBlocks.last)
+ val blocksSavedInPeer: List[Block] = commonBlocks :++ BlockHelpers.generateChain(6, commonBlocks.last)
+
+ val firstBatchBlockHeaders =
+ blocksSavedInPeer.slice(blocksSavedInPeer.size - syncConfig.blockHeadersPerRequest, blocksSavedInPeer.size)
+
+ val blocksSentFromPeer: Map[Int, List[Block]] = Map(
+ 1 -> firstBatchBlockHeaders,
+ 2 -> List(blocksSavedInPeer.get(5).get),
+ 3 -> List(blocksSavedInPeer.get(7).get),
+ 4 -> List(blocksSavedInPeer.get(5).get),
+ 5 -> List(blocksSavedInPeer.get(6).get)
+ )
+
+ saveBlocks(blocksSaved)
+ val etcPeerManager = createEtcPeerManager(handshakedPeers, blocksSentFromPeer)
+ val fastSyncBranchResolver =
+ creatFastSyncBranchResolver(sender.ref, etcPeerManager, CacheBasedBlacklist.empty(BlacklistMaxElements))
+
+ val expectation: PartialFunction[Any, BranchResolvedSuccessful] = {
+ case r @ BranchResolvedSuccessful(num, _) if num == BigInt(5) => r
+ }
+
+ val response = (for {
+ _ <- Task(sender.send(fastSyncBranchResolver, StartBranchResolver))
+ response <- Task(sender.expectMsgPF()(expectation))
+ _ <- Task(stopController(fastSyncBranchResolver))
+ } yield response).runSyncUnsafe()
+ assert(getBestPeers.contains(response.masterPeer))
+ }
+ "highest common block is in the first half" in new TestSetup {
+ override implicit lazy val system = self.system
+ override implicit val scheduler = Scheduler(system.dispatcher)
+
+ val sender = TestProbe("sender")
+
+ val commonBlocks: List[Block] = BlockHelpers.generateChain(3, BlockHelpers.genesis)
+ val blocksSaved: List[Block] = commonBlocks :++ BlockHelpers.generateChain(7, commonBlocks.last)
+ val blocksSavedInPeer: List[Block] = commonBlocks :++ BlockHelpers.generateChain(8, commonBlocks.last)
+
+ val firstBatchBlockHeaders =
+ blocksSavedInPeer.slice(blocksSavedInPeer.size - syncConfig.blockHeadersPerRequest, blocksSavedInPeer.size)
+
+ val blocksSentFromPeer: Map[Int, List[Block]] = Map(
+ 1 -> firstBatchBlockHeaders,
+ 2 -> List(blocksSavedInPeer.get(5).get),
+ 3 -> List(blocksSavedInPeer.get(2).get),
+ 4 -> List(blocksSavedInPeer.get(3).get),
+ 5 -> List(blocksSavedInPeer.get(3).get),
+ 6 -> List(blocksSavedInPeer.get(4).get)
+ )
+
+ saveBlocks(blocksSaved)
+ val etcPeerManager = createEtcPeerManager(handshakedPeers, blocksSentFromPeer)
+ val fastSyncBranchResolver =
+ creatFastSyncBranchResolver(sender.ref, etcPeerManager, CacheBasedBlacklist.empty(BlacklistMaxElements))
+
+ val expectation: PartialFunction[Any, BranchResolvedSuccessful] = {
+ case r @ BranchResolvedSuccessful(num, _) if num == BigInt(3) => r
+ }
+
+ val response = (for {
+ _ <- Task(sender.send(fastSyncBranchResolver, StartBranchResolver))
+ response <- Task(sender.expectMsgPF()(expectation))
+ _ <- Task(stopController(fastSyncBranchResolver))
+ } yield response).runSyncUnsafe()
+ assert(getBestPeers.contains(response.masterPeer))
+ }
+
+ "highest common block is in the second half" in new TestSetup {
+ override implicit lazy val system = self.system
+ override implicit val scheduler = Scheduler(system.dispatcher)
+
+ val sender = TestProbe("sender")
+
+ val commonBlocks: List[Block] = BlockHelpers.generateChain(6, BlockHelpers.genesis)
+ val blocksSaved: List[Block] = commonBlocks :++ BlockHelpers.generateChain(4, commonBlocks.last)
+ val blocksSavedInPeer: List[Block] = commonBlocks :++ BlockHelpers.generateChain(5, commonBlocks.last)
+
+ val firstBatchBlockHeaders =
+ blocksSavedInPeer.slice(blocksSavedInPeer.size - syncConfig.blockHeadersPerRequest, blocksSavedInPeer.size)
+
+ val blocksSentFromPeer: Map[Int, List[Block]] = Map(
+ 1 -> firstBatchBlockHeaders,
+ 2 -> List(blocksSavedInPeer.get(5).get),
+ 3 -> List(blocksSavedInPeer.get(7).get),
+ 4 -> List(blocksSavedInPeer.get(5).get),
+ 5 -> List(blocksSavedInPeer.get(6).get)
+ )
+
+ saveBlocks(blocksSaved)
+ val etcPeerManager = createEtcPeerManager(handshakedPeers, blocksSentFromPeer)
+ val fastSyncBranchResolver =
+ creatFastSyncBranchResolver(sender.ref, etcPeerManager, CacheBasedBlacklist.empty(BlacklistMaxElements))
+
+ val expectation: PartialFunction[Any, BranchResolvedSuccessful] = {
+ case r @ BranchResolvedSuccessful(num, _) if num == BigInt(6) => r
+ }
+
+ val response = (for {
+ _ <- Task(sender.send(fastSyncBranchResolver, StartBranchResolver))
+ response <- Task(sender.expectMsgPF()(expectation))
+ _ <- Task(stopController(fastSyncBranchResolver))
+ } yield response).runSyncUnsafe()
+ assert(getBestPeers.contains(response.masterPeer))
+ }
+ }
+
+ "No common block is found" in new TestSetup {
+ override implicit lazy val system = self.system
+ override implicit val scheduler = Scheduler(system.dispatcher)
+
+ val sender = TestProbe("sender")
+
+ // same genesis block but no common blocks
+ val blocksSaved: List[Block] = BlockHelpers.generateChain(5, BlockHelpers.genesis)
+ val blocksSavedInPeer: List[Block] = BlockHelpers.generateChain(6, BlockHelpers.genesis)
+
+ val firstBatchBlockHeaders =
+ blocksSavedInPeer.slice(blocksSavedInPeer.size - syncConfig.blockHeadersPerRequest, blocksSavedInPeer.size)
+
+ val blocksSentFromPeer: Map[Int, List[Block]] = Map(
+ 1 -> firstBatchBlockHeaders,
+ 2 -> List(blocksSavedInPeer.get(3).get),
+ 3 -> List(blocksSavedInPeer.get(1).get),
+ 4 -> List(blocksSavedInPeer.get(1).get)
+ )
+
+ saveBlocks(blocksSaved)
+ val etcPeerManager = createEtcPeerManager(handshakedPeers, blocksSentFromPeer)
+ val fastSyncBranchResolver =
+ creatFastSyncBranchResolver(sender.ref, etcPeerManager, CacheBasedBlacklist.empty(BlacklistMaxElements))
+
+ log.debug(s"*** peers: ${handshakedPeers.map(p => (p._1.id, p._2.maxBlockNumber))}")
+ (for {
+ _ <- Task(sender.send(fastSyncBranchResolver, StartBranchResolver))
+ response <- Task(sender.expectMsg(BranchResolutionFailed(NoCommonBlockFound)))
+ _ <- Task(stopController(fastSyncBranchResolver))
+ } yield response).runSyncUnsafe()
+ }
+ }
+ }
+
+ trait TestSetup extends EphemBlockchainTestSetup with TestSyncConfig with TestSyncPeers {
+
+ def peerId(number: Int): PeerId = PeerId(s"peer_$number")
+ def getPeer(id: PeerId): Peer =
+ Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe(id.value).ref, incomingConnection = false)
+ def getPeerInfo(peer: Peer, protocolVersion: Int = ProtocolVersions.PV64): PeerInfo = {
+ val status =
+ RemoteStatus(
+ protocolVersion,
+ 1,
+ ChainWeight.totalDifficultyOnly(1),
+ ByteString(s"${peer.id}_bestHash"),
+ ByteString("unused")
+ )
+ PeerInfo(
+ status,
+ forkAccepted = true,
+ chainWeight = status.chainWeight,
+ maxBlockNumber = Random.between(1, 10),
+ bestBlockHash = status.bestHash
+ )
+ }
+
+ val handshakedPeers: PeersMap = (0 to 5).toList.map((peerId _).andThen(getPeer)).fproduct(getPeerInfo(_)).toMap
+
+ def saveBlocks(blocks: List[Block]): Unit = {
+ blocks.foreach(block => blockchain.save(block, Nil, ChainWeight.totalDifficultyOnly(1), saveAsBestBlock = true))
+ }
+
+ def createEtcPeerManager(peers: PeersMap, blocks: Map[Int, List[Block]])(implicit
+ scheduler: Scheduler
+ ): ActorRef = {
+ val etcPeerManager = TestProbe("etc_peer_manager")
+ val autoPilot =
+ new EtcPeerManagerAutoPilot(
+ responsesSubject,
+ peersConnectedDeferred,
+ peers,
+ blocks
+ )
+ etcPeerManager.setAutoPilot(autoPilot)
+ etcPeerManager.ref
+ }
+
+ def creatFastSyncBranchResolver(fastSync: ActorRef, etcPeerManager: ActorRef, blacklist: Blacklist): ActorRef =
+ system.actorOf(
+ FastSyncBranchResolverActor.props(
+ fastSync = fastSync,
+ peerEventBus = TestProbe("peer_event_bus").ref,
+ etcPeerManager = etcPeerManager,
+ blockchain = blockchain,
+ blacklist = blacklist,
+ syncConfig = syncConfig,
+ appStateStorage = storagesInstance.storages.appStateStorage,
+ scheduler = system.scheduler
+ )
+ )
+
+ def stopController(actorRef: ActorRef): Unit = {
+ awaitCond(gracefulStop(actorRef, actorAskTimeout.duration).futureValue)
+ }
+
+ def getBestPeers: List[Peer] = {
+ val maxBlock = handshakedPeers.toList.map { case (_, peerInfo) => peerInfo.maxBlockNumber }.max
+ handshakedPeers.toList.filter { case (_, peerInfo) => peerInfo.maxBlockNumber == maxBlock }.map(_._1)
+ }
+ }
+}
+
+object FastSyncBranchResolverActorSpec extends Logger {
+
+ private val BlacklistMaxElements: Int = 100
+
+ private val responsesSubject: Subject[MessageFromPeer, MessageFromPeer] = ReplaySubject()
+ private val peersConnectedDeferred = Deferred.unsafe[Task, Unit]
+
+ var responses: Observable[MessageFromPeer] = responsesSubject
+
+ def fetchedHeaders: Observable[Seq[BlockHeader]] = {
+ responses
+ .collect { case MessageFromPeer(BlockHeaders(headers), _) =>
+ headers
+ }
+ }
+
+ class EtcPeerManagerAutoPilot(
+ responses: Subject[MessageFromPeer, MessageFromPeer],
+ peersConnected: Deferred[Task, Unit],
+ peers: PeersMap,
+ blocks: Map[Int, List[Block]]
+ )(implicit scheduler: Scheduler)
+ extends AutoPilot {
+
+ var blockIndex = 0
+ lazy val blocksSetSize = blocks.size
+
+ def run(sender: ActorRef, msg: Any): EtcPeerManagerAutoPilot = {
+ msg match {
+ case EtcPeerManagerActor.GetHandshakedPeers =>
+ sender ! EtcPeerManagerActor.HandshakedPeers(peers)
+ peersConnected.complete(()).onErrorHandle(_ => ()).runSyncUnsafe()
+ case sendMsg @ EtcPeerManagerActor.SendMessage(rawMsg, peerId) =>
+ val response = rawMsg.underlyingMsg match {
+ case GetBlockHeaders(_, _, _, false) =>
+ if (blockIndex < blocksSetSize)
+ blockIndex += 1
+ BlockHeaders(blocks.get(blockIndex).map(_.map(_.header)).getOrElse(Nil))
+ }
+ val theResponse = MessageFromPeer(response, peerId)
+ sender ! theResponse
+ responses.onNext(theResponse)
+ if (blockIndex == blocksSetSize)
+ responses.onComplete()
+ }
+ this
+ }
+ }
+}
diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverSpec.scala
new file mode 100644
index 0000000000..f5fcb182de
--- /dev/null
+++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/fast/FastSyncBranchResolverSpec.scala
@@ -0,0 +1,302 @@
+package io.iohk.ethereum.blockchain.sync.fast
+
+import akka.actor.ActorRef
+import akka.util.ByteString
+import io.iohk.ethereum.{BlockHelpers, Fixtures}
+import io.iohk.ethereum.blockchain.sync.fast.BinarySearchSupport._
+import io.iohk.ethereum.blockchain.sync.fast.FastSyncBranchResolver.SearchState
+import io.iohk.ethereum.domain.{Block, BlockHeader, Blockchain, BlockchainImpl}
+import io.iohk.ethereum.network.Peer
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.net.InetSocketAddress
+
+class FastSyncBranchResolverSpec extends AnyWordSpec with Matchers with MockFactory {
+
+ import Fixtures.Blocks.ValidBlock
+
+ private def blocksMap(amount: Int, parent: Block): Map[BigInt, Block] = {
+ BlockHelpers.generateChain(amount, parent).map(b => (b.number, b)).toMap
+ }
+
+ private def headersMap(amount: Int, parent: Block): Map[BigInt, BlockHeader] = {
+ BlockHelpers.generateChain(amount, parent).map(b => (b.number, b.header)).toMap
+ }
+
+ private def headersList(blocksMap: Map[BigInt, Block]): List[BlockHeader] =
+ blocksMap.values.map(_.header).toList
+
+ "FastSyncBranchResolver" must {
+ "calculate childOf block number" in {
+ import FastSyncBranchResolver.childOf
+ assert(childOf(0) === 1)
+ assert(childOf(6) === 7)
+ }
+ "calculate parentOf block number" in {
+ import FastSyncBranchResolver.parentOf
+ assert(parentOf(7) === 6)
+ }
+ "correctly discard all blocks after a certain block number" in {
+ val mockedBlockchain = mock[BlockchainImpl]
+
+ val headers = headersMap(amount = 3, parent = Block(ValidBlock.header.copy(number = 97), ValidBlock.body))
+
+ inSequence {
+ (mockedBlockchain.getBestBlockNumber _).expects().returning(BigInt(100)).once()
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(100)).returning(headers.get(100))
+ (mockedBlockchain.removeBlock _).expects(headers(100).hash, false).returning(())
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(99)).returning(headers.get(99))
+ (mockedBlockchain.removeBlock _).expects(headers(99).hash, false).returning(())
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(98)).returning(headers.get(98))
+ (mockedBlockchain.removeBlock _).expects(headers(98).hash, false).returning(())
+ }
+
+ val resolver = new FastSyncBranchResolver {
+ override val blockchain: Blockchain = mockedBlockchain
+ }
+ resolver.discardBlocksAfter(97)
+ }
+ }
+
+ "RecentBlocksSearch" must {
+ "find highest common block header" when {
+ "it's at the very end of the search range" in {
+ val mockedBlockchain = mock[BlockchainImpl]
+
+ // our: [..., 97, 98, 99, *100*]
+ // peer: [..., 97, 98, 99, *100*, 101, 102]
+ val startBlock = Block(ValidBlock.header.copy(number = 97), ValidBlock.body)
+ val ourBlocks = blocksMap(amount = 3, parent = startBlock)
+ val peerBlocks = ourBlocks ++ blocksMap(amount = 1, parent = ourBlocks(100))
+
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(100)).returns(ourBlocks.get(100).map(_.header))
+
+ val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(mockedBlockchain)
+ assert(
+ recentBlocksSearch.getHighestCommonBlock(peerBlocks.values.map(_.header).toList, 100) === Some(BigInt(100))
+ )
+ }
+ "it's at the very beginning of the search range" in {
+ val mockedBlockchain = mock[BlockchainImpl]
+
+ val ourBestBlock = 100
+ val highestCommonBlock = 97
+
+ // our: [..., *97*, 98, 99, 100]
+ // peer: [..., *97*, 98x, 99x, 100, 101x]
+ val startBlock = Block(ValidBlock.header.copy(number = 96), ValidBlock.body)
+ val ourBlocks = blocksMap(amount = 4, parent = startBlock) // 97, 98, 99, 100
+ val peerBlocks = blocksMap(amount = 4, parent = ourBlocks(97)) // 98, 99, 100, 101
+
+ inSequence {
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(100)).returns(ourBlocks.get(100).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(99)).returns(ourBlocks.get(99).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(98)).returns(ourBlocks.get(98).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(97)).returns(ourBlocks.get(97).map(_.header))
+ }
+
+ val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(mockedBlockchain)
+ assert(
+ recentBlocksSearch.getHighestCommonBlock(headersList(peerBlocks), ourBestBlock) === Some(
+ BigInt(highestCommonBlock)
+ )
+ )
+ }
+ "it's somewhere in the middle" in {
+ val mockedBlockchain = mock[BlockchainImpl]
+
+ val ourBestBlock = 100
+ val highestCommonBlock = 98
+
+ // our: [..., 95, 96, 97, *98*, 99, 100]
+ // peer: [..., 95, 96, 97, *98*, 99x, 100x, 101x]
+ val startBlock = Block(ValidBlock.header.copy(number = 95), ValidBlock.body)
+ val commonBlocks = blocksMap(amount = 3, parent = startBlock)
+ val ourBlocks = commonBlocks ++ blocksMap(amount = 2, parent = commonBlocks(highestCommonBlock))
+ val peerBlocks = blocksMap(amount = 3, parent = commonBlocks(highestCommonBlock))
+
+ inSequence {
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(100)).returns(ourBlocks.get(100).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(99)).returns(ourBlocks.get(99).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(98)).returns(ourBlocks.get(98).map(_.header))
+ }
+
+ val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(mockedBlockchain)
+ assert(recentBlocksSearch.getHighestCommonBlock(headersList(peerBlocks), ourBestBlock) === Some(BigInt(98)))
+ }
+ }
+ "return None if there's no common block header" in {
+ val mockedBlockchain = mock[BlockchainImpl]
+
+ val ourBestBlock = 100
+
+ // our: [..., 95, 96, 97, 98, 99, 100]
+ // peer: [..., 95x, 96x, 97x, 98x, 99x, 100x]
+ val startBlock = Block(ValidBlock.header.copy(number = 95), ValidBlock.body)
+ val divergedStartBlock = Block(ValidBlock.header.copy(number = 95, nonce = ByteString("foo")), ValidBlock.body)
+ val ourBlocks = blocksMap(amount = 5, parent = startBlock)
+ val peerBlocks = blocksMap(amount = 5, parent = divergedStartBlock)
+
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(100)).returns(ourBlocks.get(100).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(99)).returns(ourBlocks.get(99).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(98)).returns(ourBlocks.get(98).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(97)).returns(ourBlocks.get(97).map(_.header))
+ (mockedBlockchain.getBlockHeaderByNumber _).expects(BigInt(96)).returns(ourBlocks.get(96).map(_.header))
+
+ val recentBlocksSearch: RecentBlocksSearch = new RecentBlocksSearch(mockedBlockchain)
+ assert(recentBlocksSearch.getHighestCommonBlock(headersList(peerBlocks), ourBestBlock) === None)
+ }
+ }
+
+ "BinarySearch" must {
+ "determine correct block number to request" in {
+ // we are requesting the child block header of the middle block header, so we're expecting (middle + 1)
+ assert(BinarySearchSupport.blockHeaderNumberToRequest(3, 7) === 6)
+ // if there is no "middle", we take the lower number
+ assert(BinarySearchSupport.blockHeaderNumberToRequest(3, 6) === 5)
+ assert(BinarySearchSupport.blockHeaderNumberToRequest(3, 4) === 4)
+ assert(BinarySearchSupport.blockHeaderNumberToRequest(4, 4) === 5)
+ }
+ "complete search with highest common block number" in {
+ val ourBestBlock = 10
+ val highestCommonBlock = 6
+
+ val commonBlocks: List[Block] = BlockHelpers.generateChain(highestCommonBlock, BlockHelpers.genesis)
+ val blocksSaved: List[Block] =
+ commonBlocks :++ BlockHelpers.generateChain(ourBestBlock - highestCommonBlock, commonBlocks.last)
+ val blocksSavedInPeer: List[Block] =
+ commonBlocks :++ BlockHelpers.generateChain(ourBestBlock + 1 - highestCommonBlock, commonBlocks.last)
+
+ val dummyPeer = Peer(new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
+
+ val initialSearchState = SearchState(1, 10, dummyPeer)
+ val ours = blocksSaved.map(b => (b.number, b)).toMap
+ val peer = blocksSavedInPeer.map(b => (b.number, b)).toMap
+
+ val req1 = BinarySearchSupport.blockHeaderNumberToRequest(
+ initialSearchState.minBlockNumber,
+ initialSearchState.maxBlockNumber
+ )
+ assert(req1 === 6)
+
+ // checking whether [our:5] is the parent of [peer:6]
+ // -> yes, so block 5 is common and we continue to look for higher common blocks
+ val s1 = BinarySearchSupport.validateBlockHeaders(
+ ours(req1 - 1).header,
+ peer(req1).header,
+ initialSearchState
+ ) match {
+ case ContinueBinarySearch(searchState) => searchState
+ case _ => fail()
+ }
+ assert(s1 === SearchState(5, 10, dummyPeer))
+
+ val req2 = BinarySearchSupport.blockHeaderNumberToRequest(s1.minBlockNumber, s1.maxBlockNumber)
+ assert(req2 === 8)
+
+ // checking whether [our:7] is the parent of [peer:8]
+ // -> no, so block 6 is the max highest block and continue searching
+ val s2 = BinarySearchSupport.validateBlockHeaders(
+ ours(req2 - 1).header,
+ peer(req2).header,
+ s1
+ ) match {
+ case ContinueBinarySearch(searchState) => searchState
+ case _ => fail()
+ }
+ assert(s2 === SearchState(5, 6, dummyPeer))
+
+ val req3 = BinarySearchSupport.blockHeaderNumberToRequest(s2.minBlockNumber, s2.maxBlockNumber)
+ assert(req3 === 6)
+
+ // checking whether [our:5] is the parent of [peer:6]
+ // -> yes, and 5 was already the minimum and 6 the maximum, so the only block that could be potentially better is 6
+ // -> so we set both min and max to 6
+ val s3 = BinarySearchSupport.validateBlockHeaders(
+ ours(req3 - 1).header,
+ peer(req3).header,
+ s2
+ ) match {
+ case ContinueBinarySearch(searchState) => searchState
+ case _ => fail()
+ }
+ assert(s3 === SearchState(6, 6, dummyPeer))
+
+ val req4 = BinarySearchSupport.blockHeaderNumberToRequest(s3.minBlockNumber, s3.maxBlockNumber)
+ assert(req4 === 7)
+
+ // checking whether [our:6] is the parent of [peer:7]
+ // -> yes, so 6 is the final result
+ val res = BinarySearchSupport.validateBlockHeaders(
+ ours(req4 - 1).header,
+ peer(req4).header,
+ s3
+ ) match {
+ case BinarySearchCompleted(highestHeader) => highestHeader
+ case _ => fail()
+ }
+ assert(res === BigInt(6))
+ }
+ "complete search with no match" in {
+ val ourBestBlock = 10
+
+ val blocksSaved: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
+ val blocksSavedInPeer: List[Block] = BlockHelpers.generateChain(8, BlockHelpers.genesis)
+
+ val dummyPeer = Peer(new InetSocketAddress("foo", 1), ActorRef.noSender, false, None, 0)
+
+ val initialSearchState = SearchState(1, 8, dummyPeer)
+ val ours = blocksSaved.map(b => (b.number, b)).toMap
+ val peer = blocksSavedInPeer.map(b => (b.number, b)).toMap
+
+ val req1 = BinarySearchSupport.blockHeaderNumberToRequest(
+ initialSearchState.minBlockNumber,
+ initialSearchState.maxBlockNumber
+ )
+ assert(req1 === 5)
+
+ // checking whether [our:4] is the parent of [peer:5]
+ // -> no, so block 3 is the potentially best block
+ val s1 = BinarySearchSupport.validateBlockHeaders(
+ ours(req1 - 1).header,
+ peer(req1).header,
+ initialSearchState
+ ) match {
+ case ContinueBinarySearch(searchState) => searchState
+ case _ => fail()
+ }
+ assert(s1 === SearchState(1, 3, dummyPeer))
+
+ val req2 = BinarySearchSupport.blockHeaderNumberToRequest(s1.minBlockNumber, s1.maxBlockNumber)
+ assert(req2 === 3)
+
+ // checking whether [our:2] is the parent of [peer:3]
+ // -> no, so block 1 is the max highest block and continue searching
+ val s2 = BinarySearchSupport.validateBlockHeaders(
+ ours(req2 - 1).header,
+ peer(req2).header,
+ s1
+ ) match {
+ case ContinueBinarySearch(searchState) => searchState
+ case _ => fail()
+ }
+ assert(s2 === SearchState(1, 1, dummyPeer))
+
+ val req3 = BinarySearchSupport.blockHeaderNumberToRequest(s2.minBlockNumber, s2.maxBlockNumber)
+ assert(req3 === 2)
+
+ // checking whether [our:1] is the parent of [peer:2]
+ // -> no, that means not even block 1 is common
+ val res = BinarySearchSupport.validateBlockHeaders(
+ ours(req3 - 1).header,
+ peer(req3).header,
+ s2
+ )
+
+ assert(res === NoCommonBlock)
+ }
+ }
+
+}
diff --git a/src/test/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSpec.scala b/src/test/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSpec.scala
index 1696b70651..f18db0e368 100644
--- a/src/test/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/consensus/validators/BlockHeaderValidatorSpec.scala
@@ -313,10 +313,8 @@ class BlockHeaderValidatorSpec
extends BlockHeaderValidatorSkeleton(blockchainConfig) {
override protected def difficulty: DifficultyCalculator = difficultyCalculator
- override def validateEvenMore(
- blockHeader: BlockHeader,
- parentHeader: BlockHeader
- ): Either[BlockHeaderError, BlockHeaderValid] = Right(BlockHeaderValid)
+ override def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Right(BlockHeaderValid)
}
val parentBody: BlockBody = BlockBody.empty
diff --git a/src/test/scala/io/iohk/ethereum/consensus/validators/BlockWithCheckpointHeaderValidatorSpec.scala b/src/test/scala/io/iohk/ethereum/consensus/validators/BlockWithCheckpointHeaderValidatorSpec.scala
index a7aa5b9dcd..ca97379159 100644
--- a/src/test/scala/io/iohk/ethereum/consensus/validators/BlockWithCheckpointHeaderValidatorSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/consensus/validators/BlockWithCheckpointHeaderValidatorSpec.scala
@@ -301,10 +301,8 @@ class BlockWithCheckpointHeaderValidatorSpec
override def difficulty: DifficultyCalculator =
(_: BigInt, _: Long, _: BlockHeader) => 0
- override def validateEvenMore(
- blockHeader: BlockHeader,
- parentHeader: BlockHeader
- ): Either[BlockHeaderError, BlockHeaderValid] = Right(BlockHeaderValid)
+ override def validateEvenMore(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Right(BlockHeaderValid)
}
val blockHeaderValidator = blockHeaderValidatorBuilder(config)
diff --git a/src/test/scala/io/iohk/ethereum/ledger/LedgerTestSetup.scala b/src/test/scala/io/iohk/ethereum/ledger/LedgerTestSetup.scala
index 56c5e3a27c..ddda90b46f 100644
--- a/src/test/scala/io/iohk/ethereum/ledger/LedgerTestSetup.scala
+++ b/src/test/scala/io/iohk/ethereum/ledger/LedgerTestSetup.scala
@@ -7,7 +7,7 @@ import io.iohk.ethereum.blockchain.sync.EphemBlockchainTestSetup
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.consensus.ethash.validators.{OmmersValidator, StdOmmersValidator}
import io.iohk.ethereum.consensus.validators.BlockHeaderError.HeaderParentNotFoundError
-import io.iohk.ethereum.consensus.validators.{BlockHeaderValidator, Validators}
+import io.iohk.ethereum.consensus.validators.{BlockHeaderError, BlockHeaderValid, BlockHeaderValidator, Validators}
import io.iohk.ethereum.consensus.{GetBlockHeaderByHash, GetNBlocksBack, TestConsensus}
import io.iohk.ethereum.crypto.{generateKeyPair, kec256}
import io.iohk.ethereum.domain._
@@ -333,8 +333,15 @@ trait TestSetupWithVmAndValidators extends EphemBlockchainTestSetup {
val execError = ValidationAfterExecError("error")
object FailHeaderValidation extends Mocks.MockValidatorsAlwaysSucceed {
- override val blockHeaderValidator: BlockHeaderValidator =
- (_: BlockHeader, _: GetBlockHeaderByHash) => Left(HeaderParentNotFoundError)
+ override val blockHeaderValidator: BlockHeaderValidator = new BlockHeaderValidator {
+ override def validate(
+ blockHeader: BlockHeader,
+ getBlockHeaderByHash: GetBlockHeaderByHash
+ ): Either[BlockHeaderError, BlockHeaderValid] = Left(HeaderParentNotFoundError)
+
+ override def validateHeaderOnly(blockHeader: BlockHeader): Either[BlockHeaderError, BlockHeaderValid] =
+ Left(HeaderParentNotFoundError)
+ }
}
object NotFailAfterExecValidation extends Mocks.MockValidatorsAlwaysSucceed {
diff --git a/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala b/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala
index a7f11a712c..039d639745 100644
--- a/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala
@@ -269,7 +269,8 @@ class EtcHandshakerSpec extends AnyFlatSpec with Matchers {
)
lazy val nodeStatusHolder = new AtomicReference(nodeStatus)
- class MockEtcHandshakerConfiguration(pv: Int = Config.Network.protocolVersion) extends EtcHandshakerConfiguration {
+ class MockEtcHandshakerConfiguration(pv: Int = Config.Network.protocolVersion)
+ extends EtcHandshakerConfiguration {
override val forkResolverOpt: Option[ForkResolver] = None
override val nodeStatusHolder: AtomicReference[NodeStatus] = TestSetup.this.nodeStatusHolder
override val peerConfiguration: PeerConfiguration = Config.Network.peer