@@ -24,7 +24,6 @@ import monix.execution.Scheduler
2424
2525import scala .concurrent .duration ._
2626
27- // scalastyle:off cyclomatic.complexity
2827class BlockImporter (
2928 fetcher : ActorRef ,
3029 ledger : Ledger ,
@@ -53,16 +52,9 @@ class BlockImporter(
5352 start()
5453 }
5554
56- private def handleTopMessages (state : ImporterState , currentBehavior : Behavior ): Receive = {
57- case OnTop => context become currentBehavior(state.onTop())
58- case NotOnTop => context become currentBehavior(state.notOnTop())
59- }
60-
61- private def running (state : ImporterState ): Receive = handleTopMessages(state, running) orElse {
55+ private def running (state : ImporterState ): Receive = {
6256 case ReceiveTimeout => self ! PickBlocks
6357
64- case PrintStatus => log.info(" Block: {}, is on top?: {}" , blockchain.getBestBlockNumber(), state.isOnTop)
65-
6658 case BlockFetcher .PickedBlocks (blocks) =>
6759 SignedTransaction .retrieveSendersInBackGround(blocks.toList.map(_.body))
6860 importBlocks(blocks, DefaultBlockImport )(state)
@@ -89,7 +81,7 @@ class BlockImporter(
8981 internally = true
9082 )(state)
9183
92- case ImportNewBlock (block, peerId) if state.isOnTop && ! state.importing =>
84+ case ImportNewBlock (block, peerId) if ! state.importing =>
9385 importBlock(
9486 block,
9587 new NewBlockImportMessages (block, peerId),
@@ -121,36 +113,34 @@ class BlockImporter(
121113 running(state.resolvingBranch(from))
122114
123115 private def start (): Unit = {
124- log.debug(" Starting Regular Sync, current best block is {}" , startingBlockNumber )
125- fetcher ! BlockFetcher .Start (self, startingBlockNumber )
126- supervisor ! ProgressProtocol .StartingFrom (startingBlockNumber )
116+ log.debug(" Starting Regular Sync, current best block is {}" , bestKnownBlockNumber )
117+ fetcher ! BlockFetcher .Start (self, bestKnownBlockNumber )
118+ supervisor ! ProgressProtocol .StartingFrom (bestKnownBlockNumber )
127119 context become running(ImporterState .initial)
128120 }
129121
130122 private def pickBlocks (state : ImporterState ): Unit = {
131- val msg =
132- state.resolvingBranchFrom.fold[BlockFetcher .FetchCommand ](
133- BlockFetcher .PickBlocks (syncConfig.blocksBatchSize, self)
134- )(from => BlockFetcher .StrictPickBlocks (from, startingBlockNumber, self))
123+ val msg = state.resolvingBranchFrom.fold[BlockFetcher .FetchCommand ](
124+ BlockFetcher .PickBlocks (syncConfig.blocksBatchSize, self)
125+ )(from => BlockFetcher .StrictPickBlocks (from, bestKnownBlockNumber, self))
135126
136127 fetcher ! msg
137128 }
138129
139130 private def importBlocks (blocks : NonEmptyList [Block ], blockImportType : BlockImportType ): ImportFn = importWith(
140- {
141- Task (
131+ Task
132+ .now {
142133 log.debug(
143134 " Attempting to import blocks starting from {} and ending with {}" ,
144135 blocks.head.number,
145136 blocks.last.number
146137 )
147- )
148- .flatMap(_ => Task .now(resolveBranch(blocks)))
149- .flatMap {
150- case Right (blocksToImport) => handleBlocksImport(blocksToImport)
151- case Left (resolvingFrom) => Task .now(ResolvingBranch (resolvingFrom))
152- }
153- },
138+ resolveBranch(blocks)
139+ }
140+ .flatMap {
141+ case Right (blocksToImport) => handleBlocksImport(blocksToImport)
142+ case Left (resolvingFrom) => Task .now(ResolvingBranch (resolvingFrom))
143+ },
154144 blockImportType
155145 )
156146
@@ -187,12 +177,9 @@ class BlockImporter(
187177 importedBlocks : List [Block ] = Nil
188178 ): Task [(List [Block ], Option [Any ])] =
189179 if (blocks.isEmpty) {
190- importedBlocks.headOption match {
191- case Some (block) =>
192- supervisor ! ProgressProtocol .ImportedBlock (block.number, internally = false )
193- case None => ()
194- }
195-
180+ importedBlocks.headOption.foreach(block =>
181+ supervisor ! ProgressProtocol .ImportedBlock (block.number, internally = false )
182+ )
196183 Task .now((importedBlocks, None ))
197184 } else {
198185 val restOfBlocks = blocks.tail
@@ -244,27 +231,22 @@ class BlockImporter(
244231 broadcastBlocks(blocks, weights)
245232 updateTxPool(importedBlocksData.map(_.block), Seq .empty)
246233 supervisor ! ProgressProtocol .ImportedBlock (block.number, internally)
247- case BlockEnqueued => ()
248- case DuplicateBlock => ()
249- case UnknownParent => () // This is normal when receiving broadcast blocks
250234 case ChainReorganised (oldBranch, newBranch, weights) =>
251235 updateTxPool(newBranch, oldBranch)
252236 broadcastBlocks(newBranch, weights)
253- newBranch.lastOption match {
254- case Some (newBlock) =>
255- supervisor ! ProgressProtocol .ImportedBlock (newBlock.number, internally)
256- case None => ()
257- }
237+ newBranch.lastOption.foreach(block =>
238+ supervisor ! ProgressProtocol .ImportedBlock (block.number, internally)
239+ )
258240 case BlockImportFailedDueToMissingNode (missingNodeException) if syncConfig.redownloadMissingStateNodes =>
259241 // state node re-download will be handled when downloading headers
260242 doLog(importMessages.missingStateNode(missingNodeException))
261243 Running
262244 case BlockImportFailedDueToMissingNode (missingNodeException) =>
263245 Task .raiseError(missingNodeException)
264- case BlockImportFailed (error) =>
265- if (informFetcherOnFail) {
266- fetcher ! BlockFetcher . BlockImportFailed (block.number, BlacklistReason . BlockImportError (error) )
267- }
246+ case BlockImportFailed (error) if informFetcherOnFail =>
247+ fetcher ! BlockFetcher . BlockImportFailed (block.number, BlacklistReason . BlockImportError (error))
248+ case BlockEnqueued | DuplicateBlock | UnknownParent | BlockImportFailed (_) => ( )
249+ case result => log.error( " Unknown block import result {} " , result)
268250 }
269251 .map(_ => Running )
270252 },
@@ -279,9 +261,7 @@ class BlockImporter(
279261
280262 private def updateTxPool (blocksAdded : Seq [Block ], blocksRemoved : Seq [Block ]): Unit = {
281263 blocksRemoved.foreach(block => pendingTransactionsManager ! AddUncheckedTransactions (block.body.transactionList))
282- blocksAdded.foreach { block =>
283- pendingTransactionsManager ! RemoveTransactions (block.body.transactionList)
284- }
264+ blocksAdded.foreach(block => pendingTransactionsManager ! RemoveTransactions (block.body.transactionList))
285265 }
286266
287267 private def importWith (importTask : Task [NewBehavior ], blockImportType : BlockImportType )(
@@ -303,7 +283,6 @@ class BlockImporter(
303283 case NewBetterBranch (oldBranch) =>
304284 val transactionsToAdd = oldBranch.flatMap(_.body.transactionList)
305285 pendingTransactionsManager ! PendingTransactionsManager .AddUncheckedTransactions (transactionsToAdd)
306-
307286 // Add first block from branch as an ommer
308287 oldBranch.headOption.map(_.header).foreach(ommersPool ! AddOmmers (_))
309288 Right (blocks.toList)
@@ -312,23 +291,21 @@ class BlockImporter(
312291 ommersPool ! AddOmmers (blocks.head.header)
313292 Right (Nil )
314293 case UnknownBranch =>
315- val currentBlock = blocks.head.number.min(startingBlockNumber )
294+ val currentBlock = blocks.head.number.min(bestKnownBlockNumber )
316295 val goingBackTo = (currentBlock - syncConfig.branchResolutionRequestSize).max(0 )
317296 val msg = s " Unknown branch, going back to block nr $goingBackTo in order to resolve branches "
318-
319297 log.info(msg)
320298 fetcher ! BlockFetcher .InvalidateBlocksFrom (goingBackTo, msg, shouldBlacklist = false )
321299 Left (goingBackTo)
322300 case InvalidBranch =>
323301 val goingBackTo = blocks.head.number
324302 val msg = s " Invalid branch, going back to $goingBackTo"
325-
326303 log.info(msg)
327304 fetcher ! BlockFetcher .InvalidateBlocksFrom (goingBackTo, msg)
328305 Right (Nil )
329306 }
330307
331- private def startingBlockNumber : BigInt = blockchain.getBestBlockNumber()
308+ private def bestKnownBlockNumber : BigInt = blockchain.getBestBlockNumber()
332309
333310 private def getBehavior (newBehavior : NewBehavior , blockImportType : BlockImportType ): Behavior = newBehavior match {
334311 case Running => running
@@ -338,7 +315,6 @@ class BlockImporter(
338315}
339316
340317object BlockImporter {
341- // scalastyle:off parameter.number
342318 def props (
343319 fetcher : ActorRef ,
344320 ledger : Ledger ,
@@ -367,8 +343,6 @@ object BlockImporter {
367343
368344 sealed trait ImporterMsg
369345 case object Start extends ImporterMsg
370- case object OnTop extends ImporterMsg
371- case object NotOnTop extends ImporterMsg
372346 case class MinedBlock (block : Block ) extends ImporterMsg
373347 case class NewCheckpoint (block : Block ) extends ImporterMsg
374348 case class ImportNewBlock (block : Block , peerId : PeerId ) extends ImporterMsg
@@ -402,14 +376,9 @@ object BlockImporter {
402376 }
403377
404378 case class ImporterState (
405- isOnTop : Boolean ,
406379 importing : Boolean ,
407380 resolvingBranchFrom : Option [BigInt ]
408381 ) {
409- def onTop (): ImporterState = copy(isOnTop = true )
410-
411- def notOnTop (): ImporterState = copy(isOnTop = false )
412-
413382 def importingBlocks (): ImporterState = copy(importing = true )
414383
415384 def notImportingBlocks (): ImporterState = copy(importing = false )
@@ -423,7 +392,6 @@ object BlockImporter {
423392
424393 object ImporterState {
425394 def initial : ImporterState = ImporterState (
426- isOnTop = false ,
427395 importing = false ,
428396 resolvingBranchFrom = None
429397 )
0 commit comments