From f4843b86d04526a1fc9698e14ba0679089f6cf48 Mon Sep 17 00:00:00 2001 From: Anastasiia Pushkina Date: Mon, 21 Jun 2021 10:35:53 +0200 Subject: [PATCH] ETCM-941: Simplify Block Importer Scalafmt Fix comments --- .../sync/regular/BlockFetcher.scala | 6 +- .../sync/regular/BlockImporter.scala | 90 ++++++------------- .../blockchain/sync/regular/RegularSync.scala | 8 -- .../sync/regular/BlockFetcherSpec.scala | 4 - 4 files changed, 31 insertions(+), 77 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 47be28645e..4651ee6d32 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -15,7 +15,7 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{ HeadersNotFormingSeq, HeadersNotMatchingReadyBlocks } -import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop} +import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.ImportNewBlock import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol import io.iohk.ethereum.domain._ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer @@ -255,7 +255,6 @@ class BlockFetcher( .withPeerForBlocks(peerId, Seq(newBlockNr)) .withLastBlock(newBlockNr) .withKnownTopAt(newBlockNr) - state.importer ! OnTop state.importer ! ImportNewBlock(block, peerId) supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) processFetchCommands(newState) @@ -276,9 +275,8 @@ class BlockFetcher( replyTo: ClassicActorRef )(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState = pickResult - .tap { case (blocks, newState) => + .tap { case (blocks, _) => replyTo ! PickedBlocks(blocks) - replyTo ! (if (newState.isOnTop) OnTop else NotOnTop) } .fold(state)(_._2) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala index e14d70940c..2759971da4 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala @@ -24,7 +24,6 @@ import monix.execution.Scheduler import scala.concurrent.duration._ -// scalastyle:off cyclomatic.complexity class BlockImporter( fetcher: ActorRef, ledger: Ledger, @@ -53,16 +52,9 @@ class BlockImporter( start() } - private def handleTopMessages(state: ImporterState, currentBehavior: Behavior): Receive = { - case OnTop => context become currentBehavior(state.onTop()) - case NotOnTop => context become currentBehavior(state.notOnTop()) - } - - private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse { + private def running(state: ImporterState): Receive = { case ReceiveTimeout => self ! PickBlocks - case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop) - case BlockFetcher.PickedBlocks(blocks) => SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body)) importBlocks(blocks, DefaultBlockImport)(state) @@ -89,7 +81,7 @@ class BlockImporter( internally = true )(state) - case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => + case ImportNewBlock(block, peerId) if !state.importing => importBlock( block, new NewBlockImportMessages(block, peerId), @@ -121,36 +113,34 @@ class BlockImporter( running(state.resolvingBranch(from)) private def start(): Unit = { - log.debug("Starting Regular Sync, current best block is {}", startingBlockNumber) - fetcher ! BlockFetcher.Start(self, startingBlockNumber) - supervisor ! ProgressProtocol.StartingFrom(startingBlockNumber) + log.debug("Starting Regular Sync, current best block is {}", bestKnownBlockNumber) + fetcher ! BlockFetcher.Start(self, bestKnownBlockNumber) + supervisor ! ProgressProtocol.StartingFrom(bestKnownBlockNumber) context become running(ImporterState.initial) } private def pickBlocks(state: ImporterState): Unit = { - val msg = - state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand]( - BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self) - )(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self)) + val msg = state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand]( + BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self) + )(from => BlockFetcher.StrictPickBlocks(from, bestKnownBlockNumber, self)) fetcher ! msg } private def importBlocks(blocks: NonEmptyList[Block], blockImportType: BlockImportType): ImportFn = importWith( - { - Task( + Task + .now { log.debug( "Attempting to import blocks starting from {} and ending with {}", blocks.head.number, blocks.last.number ) - ) - .flatMap(_ => Task.now(resolveBranch(blocks))) - .flatMap { - case Right(blocksToImport) => handleBlocksImport(blocksToImport) - case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom)) - } - }, + resolveBranch(blocks) + } + .flatMap { + case Right(blocksToImport) => handleBlocksImport(blocksToImport) + case Left(resolvingFrom) => Task.now(ResolvingBranch(resolvingFrom)) + }, blockImportType ) @@ -187,12 +177,9 @@ class BlockImporter( importedBlocks: List[Block] = Nil ): Task[(List[Block], Option[Any])] = if (blocks.isEmpty) { - importedBlocks.headOption match { - case Some(block) => - supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false) - case None => () - } - + importedBlocks.headOption.foreach(block => + supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false) + ) Task.now((importedBlocks, None)) } else { val restOfBlocks = blocks.tail @@ -244,27 +231,22 @@ class BlockImporter( broadcastBlocks(blocks, weights) updateTxPool(importedBlocksData.map(_.block), Seq.empty) supervisor ! ProgressProtocol.ImportedBlock(block.number, internally) - case BlockEnqueued => () - case DuplicateBlock => () - case UnknownParent => () // This is normal when receiving broadcast blocks case ChainReorganised(oldBranch, newBranch, weights) => updateTxPool(newBranch, oldBranch) broadcastBlocks(newBranch, weights) - newBranch.lastOption match { - case Some(newBlock) => - supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally) - case None => () - } + newBranch.lastOption.foreach(block => + supervisor ! ProgressProtocol.ImportedBlock(block.number, internally) + ) case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes => // state node re-download will be handled when downloading headers doLog(importMessages.missingStateNode(missingNodeException)) Running case BlockImportFailedDueToMissingNode(missingNodeException) => Task.raiseError(missingNodeException) - case BlockImportFailed(error) => - if (informFetcherOnFail) { - fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error)) - } + case BlockImportFailed(error) if informFetcherOnFail => + fetcher ! BlockFetcher.BlockImportFailed(block.number, BlacklistReason.BlockImportError(error)) + case BlockEnqueued | DuplicateBlock | UnknownParent | BlockImportFailed(_) => () + case result => log.error("Unknown block import result {}", result) } .map(_ => Running) }, @@ -279,9 +261,7 @@ class BlockImporter( private def updateTxPool(blocksAdded: Seq[Block], blocksRemoved: Seq[Block]): Unit = { blocksRemoved.foreach(block => pendingTransactionsManager ! AddUncheckedTransactions(block.body.transactionList)) - blocksAdded.foreach { block => - pendingTransactionsManager ! RemoveTransactions(block.body.transactionList) - } + blocksAdded.foreach(block => pendingTransactionsManager ! RemoveTransactions(block.body.transactionList)) } private def importWith(importTask: Task[NewBehavior], blockImportType: BlockImportType)( @@ -303,7 +283,6 @@ class BlockImporter( case NewBetterBranch(oldBranch) => val transactionsToAdd = oldBranch.flatMap(_.body.transactionList) pendingTransactionsManager ! PendingTransactionsManager.AddUncheckedTransactions(transactionsToAdd) - // Add first block from branch as an ommer oldBranch.headOption.map(_.header).foreach(ommersPool ! AddOmmers(_)) Right(blocks.toList) @@ -312,23 +291,21 @@ class BlockImporter( ommersPool ! AddOmmers(blocks.head.header) Right(Nil) case UnknownBranch => - val currentBlock = blocks.head.number.min(startingBlockNumber) + val currentBlock = blocks.head.number.min(bestKnownBlockNumber) val goingBackTo = (currentBlock - syncConfig.branchResolutionRequestSize).max(0) val msg = s"Unknown branch, going back to block nr $goingBackTo in order to resolve branches" - log.info(msg) fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg, shouldBlacklist = false) Left(goingBackTo) case InvalidBranch => val goingBackTo = blocks.head.number val msg = s"Invalid branch, going back to $goingBackTo" - log.info(msg) fetcher ! BlockFetcher.InvalidateBlocksFrom(goingBackTo, msg) Right(Nil) } - private def startingBlockNumber: BigInt = blockchain.getBestBlockNumber() + private def bestKnownBlockNumber: BigInt = blockchain.getBestBlockNumber() private def getBehavior(newBehavior: NewBehavior, blockImportType: BlockImportType): Behavior = newBehavior match { case Running => running @@ -338,7 +315,6 @@ class BlockImporter( } object BlockImporter { - // scalastyle:off parameter.number def props( fetcher: ActorRef, ledger: Ledger, @@ -367,8 +343,6 @@ object BlockImporter { sealed trait ImporterMsg case object Start extends ImporterMsg - case object OnTop extends ImporterMsg - case object NotOnTop extends ImporterMsg case class MinedBlock(block: Block) extends ImporterMsg case class NewCheckpoint(block: Block) extends ImporterMsg case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg @@ -402,14 +376,9 @@ object BlockImporter { } case class ImporterState( - isOnTop: Boolean, importing: Boolean, resolvingBranchFrom: Option[BigInt] ) { - def onTop(): ImporterState = copy(isOnTop = true) - - def notOnTop(): ImporterState = copy(isOnTop = false) - def importingBlocks(): ImporterState = copy(importing = true) def notImportingBlocks(): ImporterState = copy(importing = false) @@ -423,7 +392,6 @@ object BlockImporter { object ImporterState { def initial: ImporterState = ImporterState( - isOnTop = false, importing = false, resolvingBranchFrom = None ) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala index 06f6ae1bf5..a8d1db6d3e 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala @@ -64,13 +64,6 @@ class RegularSync( fetcher.toClassic, BlockFetcher.PrintStatus )(context.dispatcher) - val printImporterSchedule: Cancellable = - scheduler.scheduleWithFixedDelay( - syncConfig.printStatusInterval, - syncConfig.printStatusInterval, - importer, - BlockImporter.PrintStatus - )(context.dispatcher) override def receive: Receive = running( ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0) @@ -115,7 +108,6 @@ class RegularSync( override def postStop(): Unit = { log.info("Regular Sync stopped") printFetcherSchedule.cancel() - printImporterSchedule.cancel() } } object RegularSync { diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index b13eb3eb57..2ad09ef63f 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -121,7 +121,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike // Fetcher should not enqueue any new block importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref)) - importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectNoMessage(100.millis) } @@ -152,7 +151,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref)) } - importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => blocks.map(_.hash).toList shouldEqual firstBlocksBatch.map(_.hash) } @@ -182,7 +180,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike // If we try to pick the whole chain we should only receive the first part importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref)) - importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash) } @@ -243,7 +240,6 @@ class BlockFetcherSpec extends ScalaTestWithActorTestKit() with AnyFreeSpecLike ) importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref)) - importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => val headers = blocks.map(_.header).toList