Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
HeadersNotFormingSeq,
HeadersNotMatchingReadyBlocks
}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.ImportNewBlock
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
import io.iohk.ethereum.domain._
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
Expand Down Expand Up @@ -255,7 +255,6 @@ class BlockFetcher(
.withPeerForBlocks(peerId, Seq(newBlockNr))
.withLastBlock(newBlockNr)
.withKnownTopAt(newBlockNr)
state.importer ! OnTop
state.importer ! ImportNewBlock(block, peerId)
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)
processFetchCommands(newState)
Expand All @@ -276,9 +275,8 @@ class BlockFetcher(
replyTo: ClassicActorRef
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
pickResult
.tap { case (blocks, newState) =>
.tap { case (blocks, _) =>
replyTo ! PickedBlocks(blocks)
replyTo ! (if (newState.isOnTop) OnTop else NotOnTop)
}
.fold(state)(_._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import monix.execution.Scheduler

import scala.concurrent.duration._

// scalastyle:off cyclomatic.complexity
Copy link
Contributor

@leo-bogastry leo-bogastry Jun 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that we don't need this anymore 🥳

class BlockImporter(
fetcher: ActorRef,
ledger: Ledger,
Expand Down Expand Up @@ -53,16 +52,9 @@ class BlockImporter(
start()
}

private def handleTopMessages(state: ImporterState, currentBehavior: Behavior): Receive = {
case OnTop => context become currentBehavior(state.onTop())
case NotOnTop => context become currentBehavior(state.notOnTop())
}

private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse {
private def running(state: ImporterState): Receive = {
case ReceiveTimeout => self ! PickBlocks

case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop)

case BlockFetcher.PickedBlocks(blocks) =>
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
importBlocks(blocks, DefaultBlockImport)(state)
Expand All @@ -89,7 +81,7 @@ class BlockImporter(
internally = true
)(state)

case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing =>
case ImportNewBlock(block, peerId) if !state.importing =>
importBlock(
block,
new NewBlockImportMessages(block, peerId),
Expand Down Expand Up @@ -121,36 +113,34 @@ class BlockImporter(
running(state.resolvingBranch(from))

private def start(): Unit = {
log.debug("Starting Regular Sync, current best block is {}", startingBlockNumber)
fetcher ! BlockFetcher.Start(self, startingBlockNumber)
supervisor ! ProgressProtocol.StartingFrom(startingBlockNumber)
log.debug("Starting Regular Sync, current best block is {}", bestKnownBlockNumber)
fetcher ! BlockFetcher.Start(self, bestKnownBlockNumber)
supervisor ! ProgressProtocol.StartingFrom(bestKnownBlockNumber)
context become running(ImporterState.initial)
}

private def pickBlocks(state: ImporterState): Unit = {
val msg =
state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
)(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self))
val msg = state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
)(from => BlockFetcher.StrictPickBlocks(from, bestKnownBlockNumber, self))

fetcher ! msg
}

private def importBlocks(blocks: NonEmptyList[Block], blockImportType: BlockImportType): ImportFn = importWith(
{
Task(
Task
.now {
log.debug(
"Attempting to import blocks starting from {} and ending with {}",
blocks.head.number,
blocks.last.number
)
)
.flatMap(_ => Task.now(resolveBranch(blocks)))
.flatMap {
case Right(blocksToImport) => handleBlocksImport(blocksToImport)
case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom))
}
},
resolveBranch(blocks)
}
.flatMap {
case Right(blocksToImport) => handleBlocksImport(blocksToImport)
case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom))
},
blockImportType
)

Expand Down Expand Up @@ -187,12 +177,9 @@ class BlockImporter(
importedBlocks: List[Block] = Nil
): Task[(List[Block], Option[Any])] =
if (blocks.isEmpty) {
importedBlocks.headOption match {
case Some(block) =>
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
case None => ()
}

importedBlocks.headOption.foreach(block =>
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
)
Task.now((importedBlocks, None))
} else {
val restOfBlocks = blocks.tail
Expand Down Expand Up @@ -244,27 +231,22 @@ class BlockImporter(
broadcastBlocks(blocks, weights)
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
case BlockEnqueued => ()
case DuplicateBlock => ()
case UnknownParent => () // This is normal when receiving broadcast blocks
case ChainReorganised(oldBranch, newBranch, weights) =>
updateTxPool(newBranch, oldBranch)
broadcastBlocks(newBranch, weights)
newBranch.lastOption match {
case Some(newBlock) =>
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally)
case None => ()
}
newBranch.lastOption.foreach(block =>
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
)
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
// state node re-download will be handled when downloading headers
doLog(importMessages.missingStateNode(missingNodeException))
Running
case BlockImportFailedDueToMissingNode(missingNodeException) =>
Task.raiseError(missingNodeException)
case BlockImportFailed(error) =>
if (informFetcherOnFail) {
fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error))
}
case BlockImportFailed(error) if informFetcherOnFail =>
fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error))
case BlockEnqueued | DuplicateBlock | UnknownParent | BlockImportFailed(_) => ()
case result => log.error("Unknown block import result {}", result)
}
.map(_ => Running)
},
Expand All @@ -279,9 +261,7 @@ class BlockImporter(

private def updateTxPool(blocksAdded: Seq[Block], blocksRemoved: Seq[Block]): Unit = {
blocksRemoved.foreach(block => pendingTransactionsManager ! AddUncheckedTransactions(block.body.transactionList))
blocksAdded.foreach { block =>
pendingTransactionsManager ! RemoveTransactions(block.body.transactionList)
}
blocksAdded.foreach(block => pendingTransactionsManager ! RemoveTransactions(block.body.transactionList))
}

private def importWith(importTask: Task[NewBehavior], blockImportType: BlockImportType)(
Expand All @@ -303,7 +283,6 @@ class BlockImporter(
case NewBetterBranch(oldBranch) =>
val transactionsToAdd = oldBranch.flatMap(_.body.transactionList)
pendingTransactionsManager ! PendingTransactionsManager.AddUncheckedTransactions(transactionsToAdd)

// Add first block from branch as an ommer
oldBranch.headOption.map(_.header).foreach(ommersPool ! AddOmmers(_))
Right(blocks.toList)
Expand All @@ -312,23 +291,21 @@ class BlockImporter(
ommersPool ! AddOmmers(blocks.head.header)
Right(Nil)
case UnknownBranch =>
val currentBlock = blocks.head.number.min(startingBlockNumber)
val currentBlock = blocks.head.number.min(bestKnownBlockNumber)
val goingBackTo = (currentBlock - syncConfig.branchResolutionRequestSize).max(0)
val msg = s"Unknown branch, going back to block nr $goingBackTo in order to resolve branches"

log.info(msg)
fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg, shouldBlacklist = false)
Left(goingBackTo)
case InvalidBranch =>
val goingBackTo = blocks.head.number
val msg = s"Invalid branch, going back to $goingBackTo"

log.info(msg)
fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg)
Right(Nil)
}

private def startingBlockNumber: BigInt = blockchain.getBestBlockNumber()
private def bestKnownBlockNumber: BigInt = blockchain.getBestBlockNumber()

private def getBehavior(newBehavior: NewBehavior, blockImportType: BlockImportType): Behavior = newBehavior match {
case Running => running
Expand All @@ -338,7 +315,6 @@ class BlockImporter(
}

object BlockImporter {
// scalastyle:off parameter.number
def props(
fetcher: ActorRef,
ledger: Ledger,
Expand Down Expand Up @@ -367,8 +343,6 @@ object BlockImporter {

sealed trait ImporterMsg
case object Start extends ImporterMsg
case object OnTop extends ImporterMsg
case object NotOnTop extends ImporterMsg
case class MinedBlock(block: Block) extends ImporterMsg
case class NewCheckpoint(block: Block) extends ImporterMsg
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
Expand Down Expand Up @@ -402,14 +376,9 @@ object BlockImporter {
}

case class ImporterState(
isOnTop: Boolean,
importing: Boolean,
resolvingBranchFrom: Option[BigInt]
) {
def onTop(): ImporterState = copy(isOnTop = true)

def notOnTop(): ImporterState = copy(isOnTop = false)

def importingBlocks(): ImporterState = copy(importing = true)

def notImportingBlocks(): ImporterState = copy(importing = false)
Expand All @@ -423,7 +392,6 @@ object BlockImporter {

object ImporterState {
def initial: ImporterState = ImporterState(
isOnTop = false,
importing = false,
resolvingBranchFrom = None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ class RegularSync(
fetcher.toClassic,
BlockFetcher.PrintStatus
)(context.dispatcher)
val printImporterSchedule: Cancellable =
scheduler.scheduleWithFixedDelay(
syncConfig.printStatusInterval,
syncConfig.printStatusInterval,
importer,
BlockImporter.PrintStatus
)(context.dispatcher)

override def receive: Receive = running(
ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0)
Expand Down Expand Up @@ -115,7 +108,6 @@ class RegularSync(
override def postStop(): Unit = {
log.info("Regular Sync stopped")
printFetcherSchedule.cancel()
printImporterSchedule.cancel()
}
}
object RegularSync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike

// Fetcher should not enqueue any new block
importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
importer.expectNoMessage(100.millis)
}

Expand Down Expand Up @@ -152,7 +151,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref))
}

importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
blocks.map(_.hash).toList shouldEqual firstBlocksBatch.map(_.hash)
}
Expand Down Expand Up @@ -182,7 +180,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike

// If we try to pick the whole chain we should only receive the first part
importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref))
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash)
}
Expand Down Expand Up @@ -243,7 +240,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike
)

importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref))
importer.ignoreMsg({ case BlockImporter.NotOnTop => true })
importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) =>
val headers = blocks.map(_.header).toList

Expand Down