@@ -296,7 +296,7 @@ class FastSync(
296296 blockchain.getTotalDifficultyByHash(header.parentHash).toRight(ParentDifficultyNotFound (header))
297297 }
298298
299- private def handleBlockValidationError (header : BlockHeader , peer : Peer , N : Int ): Unit = {
299+ private def handleRewind (header : BlockHeader , peer : Peer , N : Int ): Unit = {
300300 blacklist(peer.id, blacklistDuration, " block header validation failed" )
301301 if (header.number <= syncState.safeDownloadTarget) {
302302 discardLastBlocks(header.number, N )
@@ -315,14 +315,18 @@ class FastSync(
315315 if (checkHeadersChain(headers)) {
316316 processHeaders(peer, headers) match {
317317 case ParentDifficultyNotFound (header) =>
318- log.debug(" Parent difficulty not found for block {}, not processing rest of headers" , header.number)
319- processSyncing()
318+ // We could end in wrong fork and get blocked so we should rewind our state a little
319+ // we blacklist peer just in case we got malicious peer which would send us bad blocks, forcing us to rollback
320+ // to genesis
321+ log.warning(" Parent difficulty not found for block {}, not processing rest of headers" , header.idTag)
322+ handleRewind(header, peer, syncConfig.fastSyncBlockValidationN)
320323 case HeadersProcessingFinished =>
321324 processSyncing()
322325 case ImportedTargetBlock =>
323326 updateTargetBlock(ImportedLastBlock )
324327 case ValidationFailed (header, peerToBlackList) =>
325- handleBlockValidationError(header, peerToBlackList, syncConfig.fastSyncBlockValidationN)
328+ log.warning(s " validation fo header ${header.idTag} failed " )
329+ handleRewind(header, peerToBlackList, syncConfig.fastSyncBlockValidationN)
326330 }
327331 } else {
328332 blacklist(peer.id, blacklistDuration, " error in block headers response" )
@@ -385,8 +389,8 @@ class FastSync(
385389 }
386390
387391 private def handleNodeData (peer : Peer , requestedHashes : Seq [HashType ], nodeData : NodeData ) = {
388- if (nodeData.values.isEmpty) {
389- log.debug (s " got empty mpt node response for known hashes switching to blockchain only : ${requestedHashes.map(h => Hex .toHexString(h.v.toArray[Byte ]))}" )
392+ if (nodeData.values.isEmpty && requestedHashes.nonEmpty ) {
393+ log.info (s " got empty mpt node response for known hashes from peer ${peer.id} : ${requestedHashes.map(h => Hex .toHexString(h.v.toArray[Byte ]))}" )
390394 blacklist(peer.id,blacklistDuration, " empty mpt node response for known hashes" )
391395 }
392396
@@ -679,24 +683,28 @@ class FastSync(
679683 }
680684
681685 def requestNodes (peer : Peer ): Unit = {
682- val (nonMptNodesToGet, remainingNonMptNodes) = syncState.pendingNonMptNodes.splitAt(nodesPerRequest)
683- val (mptNodesToGet, remainingMptNodes) = syncState.pendingMptNodes.splitAt(nodesPerRequest - nonMptNodesToGet.size)
684- val nodesToGet = nonMptNodesToGet ++ mptNodesToGet
685-
686- val handler = context.actorOf(
687- PeerRequestHandler .props[GetNodeData , NodeData ](
688- peer, peerResponseTimeout, etcPeerManager, peerEventBus,
689- requestMsg = GetNodeData (nodesToGet.map(_.v)),
690- responseMsgCode = NodeData .code))
691-
692- context watch handler
693- assignedHandlers += (handler -> peer)
694- peerRequestsTime += (peer -> Instant .now())
695- syncState = syncState.copy(
696- pendingNonMptNodes = remainingNonMptNodes,
697- pendingMptNodes = remainingMptNodes)
698- requestedMptNodes += handler -> mptNodesToGet
699- requestedNonMptNodes += handler -> nonMptNodesToGet
686+ if (syncState.pendingNonMptNodes.nonEmpty || syncState.pendingMptNodes.nonEmpty) {
687+ val (nonMptNodesToGet, remainingNonMptNodes) = syncState.pendingNonMptNodes.splitAt(nodesPerRequest)
688+ val (mptNodesToGet, remainingMptNodes) = syncState.pendingMptNodes.splitAt(nodesPerRequest - nonMptNodesToGet.size)
689+ val nodesToGet = nonMptNodesToGet ++ mptNodesToGet
690+ log.info(s " Request ${nodesToGet.size} nodes from peer ${peer.id}" )
691+ val handler = context.actorOf(
692+ PeerRequestHandler .props[GetNodeData , NodeData ](
693+ peer, peerResponseTimeout, etcPeerManager, peerEventBus,
694+ requestMsg = GetNodeData (nodesToGet.map(_.v)),
695+ responseMsgCode = NodeData .code))
696+
697+ context watch handler
698+ assignedHandlers += (handler -> peer)
699+ peerRequestsTime += (peer -> Instant .now())
700+ syncState = syncState.copy(
701+ pendingNonMptNodes = remainingNonMptNodes,
702+ pendingMptNodes = remainingMptNodes)
703+ requestedMptNodes += handler -> mptNodesToGet
704+ requestedNonMptNodes += handler -> nonMptNodesToGet
705+ } else {
706+ log.debug(" There is node work to assign for peer" )
707+ }
700708 }
701709
702710 def unassignedPeers : Set [Peer ] = peersToDownloadFrom.keySet diff assignedHandlers.values.toSet
0 commit comments