From 61503fad70b8f640e9cf1e51c7190cba0e4d6c73 Mon Sep 17 00:00:00 2001 From: Anastasiia Pushkina Date: Fri, 23 Apr 2021 17:02:05 +0200 Subject: [PATCH] [ETCM-739] Refactor BlockFetcher Change BlockFetcher to typed actor Split fetcher message handling among child actors Abstract the fetch trait Scalafmt Refactor blockFetcherState Fix it tests Fix scalastyle, comments Fix comments Add logging --- nix/overlay.nix | 2 +- project/Dependencies.scala | 2 + .../ethereum/ledger/BlockImporterItSpec.scala | 2 +- .../ethereum/sync/RegularSyncItSpec.scala | 18 - .../sync/util/RegularSyncItSpecUtils.scala | 22 +- .../sync/regular/BlockFetcher.scala | 560 +++++++----------- .../sync/regular/BlockFetcherState.scala | 50 +- .../sync/regular/BlockImporter.scala | 14 +- .../sync/regular/BodiesFetcher.scala | 72 +++ .../sync/regular/FetchRequest.scala | 56 ++ .../sync/regular/HeadersFetcher.scala | 81 +++ .../blockchain/sync/regular/RegularSync.scala | 25 +- .../sync/regular/StateNodeFetcher.scala | 104 ++++ .../sync/regular/BlockFetcherSpec.scala | 306 ++-------- .../sync/regular/BlockFetcherStateSpec.scala | 60 -- 15 files changed, 599 insertions(+), 775 deletions(-) create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BodiesFetcher.scala create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetchRequest.scala create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/regular/StateNodeFetcher.scala diff --git a/nix/overlay.nix b/nix/overlay.nix index f8546d37c4..a830d472b5 100644 --- a/nix/overlay.nix +++ b/nix/overlay.nix @@ -3,7 +3,7 @@ rev: final: prev: { mantis = final.callPackage ./mantis.nix { src = ../.; - depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs="; + depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY="; }; mantis-hash = final.mantis.override { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d156d4eb0e..15a91e8b1e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -14,7 +14,9 @@ object Dependencies { Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion, + "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion, "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test" ) diff --git a/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala b/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala index 040e8c69ae..28204de3fb 100644 --- a/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala @@ -267,7 +267,7 @@ class BlockImporterItSpec val msg = fetcherProbe .fishForMessage(Timeouts.longTimeout) { - case BlockFetcher.FetchStateNode(_) => true + case BlockFetcher.FetchStateNode(_, _) => true case _ => false } .asInstanceOf[BlockFetcher.FetchStateNode] diff --git a/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala index 44f04bc746..bda189d94f 100644 --- a/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala @@ -28,24 +28,6 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl testScheduler.awaitTermination(120.second) } - "a peer should reorganise when receives a checkpoint older than the current best from a peer" in customTestCaseResourceM( - FakePeer.start2FakePeersRes() - ) { case (peer1, peer2) => - for { - _ <- peer1.importBlocksUntil(20)(IdentityUpdate) - _ <- peer2.importBlocksUntil(30)(IdentityUpdate) - _ <- peer1.startRegularSync() - _ <- peer2.startRegularSync() - _ <- peer1.addCheckpointedBlock(peer1.bl.getBestBlock().get) - _ <- peer1.waitForRegularSyncLoadLastBlock(21) - _ <- peer2.getCheckpointFromPeer(peer1.bl.getBestBlock().get, PeerId("Peer1")) - _ <- peer2.waitForRegularSyncLoadLastBlock(21) - } yield { - assert(peer1.bl.getBestBlock().get.hash == peer2.bl.getBestBlock().get.hash) - assert(peer1.bl.getLatestCheckpointBlockNumber() == peer2.bl.getLatestCheckpointBlockNumber()) - } - } - "peer 2 should sync to the top of peer1 blockchain" - { "given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { case (peer1, peer2) => diff --git a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala index 2fb6cefebd..e3c16166ec 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala @@ -1,19 +1,13 @@ package io.iohk.ethereum.sync.util -import akka.actor.ActorRef +import akka.actor.{ActorRef, typed} import akka.util.ByteString import cats.effect.Resource import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.Start -import io.iohk.ethereum.blockchain.sync.regular.{ - BlockBroadcast, - BlockBroadcasterActor, - BlockFetcher, - BlockImporter, - RegularSync -} +import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor, BlockFetcher, BlockImporter, RegularSync} import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol} import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers @@ -36,6 +30,8 @@ import io.iohk.ethereum.utils._ import io.iohk.ethereum.vm.EvmConfig import monix.eval.Task import monix.execution.Scheduler +import akka.actor.typed.scaladsl.adapter._ +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.AdaptedMessageFromEventBus import scala.concurrent.duration._ object RegularSyncItSpecUtils { @@ -86,15 +82,15 @@ object RegularSyncItSpecUtils { "block-broadcaster" ) - val fetcher: ActorRef = - system.actorOf( - BlockFetcher.props(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator), + val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] = + system.spawn( + BlockFetcher(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator), "block-fetcher" ) lazy val blockImporter = system.actorOf( BlockImporter.props( - fetcher, + fetcher.toClassic, ledger, bl, syncConfig, @@ -181,7 +177,7 @@ object RegularSyncItSpecUtils { def getCheckpointFromPeer(checkpoint: Block, peerId: PeerId): Task[Unit] = Task { blockImporter ! Start - fetcher ! MessageFromPeer(NewBlock(checkpoint, checkpoint.header.difficulty), peerId) + fetcher ! AdaptedMessageFromEventBus(NewBlock(checkpoint, checkpoint.header.difficulty), peerId) } private def getMptForBlock(block: Block) = { diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index a482739765..1500b5d3de 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -1,14 +1,11 @@ package io.iohk.ethereum.blockchain.sync.regular -import akka.actor.Status.Failure -import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} -import akka.pattern.{ask, pipe} -import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueue} -import akka.stream.{Attributes, DelayOverflowStrategy, OverflowStrategy} +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.actor.{ActorRef => ClassicActorRef} import akka.util.{ByteString, Timeout} import cats.data.NonEmptyList import cats.instances.option._ -import cats.syntax.either._ import io.iohk.ethereum.consensus.validators.BlockValidator import io.iohk.ethereum.blockchain.sync.PeersClient._ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{ @@ -17,224 +14,217 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{ } import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop} import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol -import io.iohk.ethereum.crypto.kec256 import io.iohk.ethereum.domain._ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier -import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe} -import io.iohk.ethereum.network.PeerId +import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe} +import io.iohk.ethereum.network.{Peer, PeerEventBusActor, PeerId} +import io.iohk.ethereum.network.p2p.Message import io.iohk.ethereum.network.p2p.messages.{Codes, CommonMessages, PV64} import io.iohk.ethereum.network.p2p.messages.PV62._ -import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData} +import io.iohk.ethereum.network.p2p.messages.PV63.NodeData import io.iohk.ethereum.utils.ByteStringUtils import io.iohk.ethereum.utils.Config.SyncConfig import io.iohk.ethereum.utils.FunctorOps._ -import monix.eval.Task import monix.execution.{Scheduler => MonixScheduler} import mouse.all._ +import akka.actor.typed.scaladsl.adapter._ import scala.concurrent.duration._ class BlockFetcher( - val peersClient: ActorRef, - val peerEventBus: ActorRef, - val supervisor: ActorRef, + val peersClient: ClassicActorRef, + val peerEventBus: ClassicActorRef, + val supervisor: ClassicActorRef, val syncConfig: SyncConfig, - val blockValidator: BlockValidator -) extends Actor - with ActorLogging { + val blockValidator: BlockValidator, + context: ActorContext[BlockFetcher.FetchCommand] +) extends AbstractBehavior[BlockFetcher.FetchCommand](context) { import BlockFetcher._ - implicit val ec: MonixScheduler = MonixScheduler(context.dispatcher) - implicit val sys: ActorSystem = context.system + implicit val ec: MonixScheduler = MonixScheduler(context.executionContext) implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication + private val log = context.log - private val queue: SourceQueue[BlockFetcherState] = { - val cap = 1000 - val numberOfElements = 1 - Source - .queue[BlockFetcherState](cap, OverflowStrategy.backpressure) - .delay(5.seconds, DelayOverflowStrategy.dropHead) - .addAttributes(Attributes.inputBuffer(numberOfElements, numberOfElements)) - .map { s => - log.debug("Resuming fetching with the latest state") - fetchBlocks(s.withResumedFetching) - } - .toMat(Sink.ignore)(Keep.left) - .run() - } - - override def receive: Receive = idle() - - override def postStop(): Unit = { - super.postStop() - peerEventBus ! Unsubscribe() - } - - private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) => - BlockFetcherState.initial(importer, blockValidator, blockNr) |> fetchBlocks - peerEventBus ! Subscribe( - MessageClassifier( - Set(Codes.NewBlockCode, Codes.NewBlockHashesCode, Codes.BlockHeadersCode), - PeerSelector.AllPeers - ) + val headersFetcher: ActorRef[HeadersFetcher.HeadersFetcherCommand] = + context.spawn( + HeadersFetcher(peersClient, syncConfig, context.self), + "headers-fetcher" ) - } - - def handleCommonMessages(state: Option[BlockFetcherState]): Receive = { case PrintStatus => - log.info("{}", state.map(_.status)) - log.debug("{}", state.map(_.statusDetailed)) - } + context.watch(headersFetcher) - private def started(state: BlockFetcherState): Receive = - handleCommonMessages(Some(state)) orElse - handleCommands(state) orElse - handleNewBlockMessages(state) orElse - handleHeadersMessages(state) orElse - handleBodiesMessages(state) orElse - handleStateNodeMessages(state) orElse - handlePossibleTopUpdate(state) - - private def handleCommands(state: BlockFetcherState): Receive = { - case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks - - case StrictPickBlocks(from, atLeastWith) => - // FIXME: Consider having StrictPickBlocks calls guaranteeing this - // from parameter could be negative or 0 so we should cap it to 1 if that's the case - val fromCapped = from.max(1) - val minBlock = fromCapped.min(atLeastWith).max(1) - log.debug("Strict Pick blocks from {} to {}", fromCapped, atLeastWith) - log.debug("Lowest available block is {}", state.lowestBlock) - - val newState = if (minBlock < state.lowestBlock) { - state.invalidateBlocksFrom(minBlock, None)._2 - } else { - state.strictPickBlocks(fromCapped, atLeastWith) |> handlePickedBlocks(state) - } + val bodiesFetcher: ActorRef[BodiesFetcher.BodiesFetcherCommand] = + context.spawn( + BodiesFetcher(peersClient, syncConfig, context.self), + "bodies-fetcher" + ) + context.watch(bodiesFetcher) - fetchBlocks(newState) + val stateNodeFetcher: ActorRef[StateNodeFetcher.StateNodeFetcherCommand] = + context.spawn( + StateNodeFetcher(peersClient, syncConfig, context.self), + "state-node-fetcher" + ) + context.watch(stateNodeFetcher) + + private def subscribeAdapter( + fetcher: ActorRef[BlockFetcher.AdaptedMessageFromEventBus] + ): Behaviors.Receive[MessageFromPeer] = + Behaviors.receiveMessage[PeerEventBusActor.PeerEvent.MessageFromPeer] { case MessageFromPeer(message, peerId) => + fetcher ! AdaptedMessageFromEventBus(message, peerId) + Behaviors.same + } - case InvalidateBlocksFrom(blockNr, reason, withBlacklist) => - val (blockProvider, newState) = state.invalidateBlocksFrom(blockNr, withBlacklist) - log.debug("Invalidate blocks from {}", blockNr) - blockProvider.foreach(peersClient ! BlacklistPeer(_, reason)) - fetchBlocks(newState) + override def onMessage(message: FetchCommand): Behavior[FetchCommand] = { + message match { + case Start(importer, fromBlock) => + val sa = context.spawn(subscribeAdapter(context.self), "fetcher-subscribe-adapter") + peerEventBus.tell( + Subscribe( + MessageClassifier( + Set(Codes.NewBlockCode, Codes.NewBlockHashesCode, Codes.BlockHeadersCode), + PeerSelector.AllPeers + ) + ), + sa.toClassic + ) + BlockFetcherState.initial(importer, blockValidator, fromBlock) |> fetchBlocks + case msg => + log.debug("Fetcher subscribe adapter received unhandled message {}", msg) + Behaviors.unhandled + } } - private def handleHeadersMessages(state: BlockFetcherState): Receive = { - case Response(_, BlockHeaders(headers)) if state.isFetchingHeaders => - val newState = - if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) { - log.debug( - "Received {} headers starting from block {} that will be ignored", - headers.size, - headers.headOption.map(_.number) - ) - state.withHeaderFetchReceived + // scalastyle:off cyclomatic.complexity method.length + private def processFetchCommands(state: BlockFetcherState): Behavior[FetchCommand] = + Behaviors.receiveMessage { + case PrintStatus => + log.info("{}", state.status) + log.debug("{}", state.statusDetailed) + Behaviors.same + + case PickBlocks(amount, replyTo) => + state.pickBlocks(amount) |> handlePickedBlocks(state, replyTo) |> fetchBlocks + + case StrictPickBlocks(from, atLeastWith, replyTo) => + // FIXME: Consider having StrictPickBlocks calls guaranteeing this + // from parameter could be negative or 0 so we should cap it to 1 if that's the case + val fromCapped = from.max(1) + val minBlock = fromCapped.min(atLeastWith).max(1) + log.debug("Strict Pick blocks from {} to {}", fromCapped, atLeastWith) + log.debug("Lowest available block is {}", state.lowestBlock) + + val newState = if (minBlock < state.lowestBlock) { + state.invalidateBlocksFrom(minBlock, None)._2 } else { - log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number)) - - state.appendHeaders(headers) match { - case Left(err) => - log.info("Dismissed received headers due to: {}", err) - state.withHeaderFetchReceived - case Right(updatedState) => - updatedState.withHeaderFetchReceived - } + state.strictPickBlocks(fromCapped, atLeastWith) |> handlePickedBlocks(state, replyTo) } - //First successful fetch - if (state.waitingHeaders.isEmpty) { - supervisor ! ProgressProtocol.StartedFetching - } - - fetchBlocks(newState) - case RetryHeadersRequest if state.isFetchingHeaders => - log.debug("Something failed on a headers request, cancelling the request and re-fetching") + fetchBlocks(newState) - val newState = state.withHeaderFetchReceived - fetchBlocks(newState) - } + case InvalidateBlocksFrom(blockNr, reason, withBlacklist) => + val (blockProvider, newState) = state.invalidateBlocksFrom(blockNr, withBlacklist) + log.debug("Invalidate blocks from {}", blockNr) + blockProvider.foreach(peersClient ! BlacklistPeer(_, reason)) + fetchBlocks(newState) - private def handleBodiesMessages(state: BlockFetcherState): Receive = { - case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies => - log.debug("Received {} block bodies", bodies.size) - if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) { - log.debug("Block bodies will be ignored due to an invalidation was requested for them") - fetchBlocks(state.withBodiesFetchReceived) - } else { + case ReceivedHeaders(headers) if state.isFetchingHeaders => + //First successful fetch + if (state.waitingHeaders.isEmpty) { + supervisor ! ProgressProtocol.StartedFetching + } val newState = - state.validateBodies(bodies) match { - case Left(err) => - peersClient ! BlacklistPeer(peer.id, err) - state.withBodiesFetchReceived - case Right(newBlocks) => - state.withBodiesFetchReceived.handleRequestedBlocks(newBlocks, peer.id) + if (state.fetchingHeadersState == AwaitingHeadersToBeIgnored) { + log.debug( + "Received {} headers starting from block {} that will be ignored", + headers.size, + headers.headOption.map(_.number) + ) + state.withHeaderFetchReceived + } else { + log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number)) + state.appendHeaders(headers) match { + case Left(err) => + log.info("Dismissed received headers due to: {}", err) + state.withHeaderFetchReceived + case Right(updatedState) => + updatedState.withHeaderFetchReceived + } } - val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size - log.debug("Processed {} new blocks from received block bodies", waitingHeadersDequeued) fetchBlocks(newState) - } + case RetryHeadersRequest if state.isFetchingHeaders => + log.debug("Something failed on a headers request, cancelling the request and re-fetching") + fetchBlocks(state.withHeaderFetchReceived) + + case ReceivedBodies(peer, bodies) if state.isFetchingBodies => + log.debug("Received {} block bodies", bodies.size) + if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) { + log.debug("Block bodies will be ignored due to an invalidation was requested for them") + fetchBlocks(state.withBodiesFetchReceived) + } else { + val newState = + state.validateBodies(bodies) match { + case Left(err) => + peersClient ! BlacklistPeer(peer.id, err) + state.withBodiesFetchReceived + case Right(newBlocks) => + state.withBodiesFetchReceived.handleRequestedBlocks(newBlocks, peer.id) + } + val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size + log.debug("Processed {} new blocks from received block bodies", waitingHeadersDequeued) + fetchBlocks(newState) + } - case RetryBodiesRequest if state.isFetchingBodies => - log.debug("Something failed on a bodies request, cancelling the request and re-fetching") - val newState = state.withBodiesFetchReceived - fetchBlocks(newState) - } + case RetryBodiesRequest if state.isFetchingBodies => + log.debug("Something failed on a bodies request, cancelling the request and re-fetching") + fetchBlocks(state.withBodiesFetchReceived) + + case FetchStateNode(hash, replyTo) => + log.debug("Fetching state node for hash {}", ByteStringUtils.hash2string(hash)) + stateNodeFetcher ! StateNodeFetcher.FetchStateNode(hash, replyTo) + Behaviors.same - private def handleStateNodeMessages(state: BlockFetcherState): Receive = { - case FetchStateNode(hash) => fetchStateNode(hash, sender(), state) - - case RetryFetchStateNode if state.isFetchingStateNode => - state.stateNodeFetcher.foreach(fetcher => fetchStateNode(fetcher.hash, fetcher.replyTo, state)) - - case Response(peer, NodeData(values)) if state.isFetchingStateNode => - log.debug("Received state node response from peer {}", peer) - state.stateNodeFetcher.foreach(fetcher => { - val validatedNode = values - .asRight[String] - .ensure(s"Empty response from peer $peer, blacklisting")(_.nonEmpty) - .ensure("Fetched node state hash doesn't match requested one, blacklisting peer")(nodes => - fetcher.hash == kec256(nodes.head) - ) - - validatedNode match { - case Left(err) => - log.debug(err) - peersClient ! BlacklistPeer(peer.id, err) - fetchStateNode(fetcher.hash, fetcher.replyTo, state) - case Right(node) => - fetcher.replyTo ! FetchedStateNode(NodeData(node)) - context become started(state.notFetchingStateNode()) + case AdaptedMessageFromEventBus(NewBlockHashes(hashes), _) => + log.debug("Received NewBlockHashes numbers {}", hashes.map(_.number).mkString(", ")) + val newState = state.validateNewBlockHashes(hashes) match { + case Left(_) => state + case Right(validHashes) => state.withPossibleNewTopAt(validHashes.lastOption.map(_.number)) } - }) - } + supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) + fetchBlocks(newState) - private def handleNewBlockMessages(state: BlockFetcherState): Receive = { - case MessageFromPeer(NewBlockHashes(hashes), _) => - log.debug("Received NewBlockHashes numbers {}", hashes.map(_.number).mkString(", ")) - val newState = state.validateNewBlockHashes(hashes) match { - case Left(_) => state - case Right(validHashes) => state.withPossibleNewTopAt(validHashes.lastOption.map(_.number)) - } - supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) - fetchBlocks(newState) + case AdaptedMessageFromEventBus(CommonMessages.NewBlock(block, _), peerId) => + handleNewBlock(block, peerId, state) - case MessageFromPeer(CommonMessages.NewBlock(block, _), peerId) => - handleNewBlock(block, peerId, state) + case AdaptedMessageFromEventBus(PV64.NewBlock(block, _), peerId) => + handleNewBlock(block, peerId, state) - case MessageFromPeer(PV64.NewBlock(block, _), peerId) => - handleNewBlock(block, peerId, state) + case BlockImportFailed(blockNr, reason) => + val (peerId, newState) = state.invalidateBlocksFrom(blockNr) + peerId.foreach(id => peersClient ! BlacklistPeer(id, reason)) + fetchBlocks(newState) - case BlockImportFailed(blockNr, reason) => - val (peerId, newState) = state.invalidateBlocksFrom(blockNr) - peerId.foreach(id => peersClient ! BlacklistPeer(id, reason)) - fetchBlocks(newState) - } + case AdaptedMessageFromEventBus(BlockHeaders(headers), _) => + headers.lastOption + .map { bh => + log.debug("Candidate for new top at block {}, current known top {}", bh.number, state.knownTop) + val newState = state.withPossibleNewTopAt(bh.number) + fetchBlocks(newState) + } + .getOrElse(processFetchCommands(state)) + //keep fetcher state updated in case new mined block was imported + case InternalLastBlockImport(blockNr) => + log.debug("New mined block {} imported from the inside", blockNr) + val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr) + fetchBlocks(newState) - private def handleNewBlock(block: Block, peerId: PeerId, state: BlockFetcherState): Unit = { - //TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly + case msg => + log.debug("Block fetcher received unhandled message {}", msg) + Behaviors.unhandled + } + + private def handleNewBlock(block: Block, peerId: PeerId, state: BlockFetcherState): Behavior[FetchCommand] = { log.debug("Received NewBlock {}", block.idTag) val newBlockNr = block.number val nextExpectedBlock = state.lastBlock + 1 @@ -248,101 +238,33 @@ class BlockFetcher( state.importer ! OnTop state.importer ! ImportNewBlock(block, peerId) supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) - context become started(newState) - } else if (block.hasCheckpoint) { - handleNewCheckpointBlockNotOnTop(block, peerId, state) - } else handleFutureBlock(block, state) + processFetchCommands(newState) + } else { + handleFutureBlock(block, state) + } } - private def handleFutureBlock(block: Block, state: BlockFetcherState): Unit = { + private def handleFutureBlock(block: Block, state: BlockFetcherState): Behavior[FetchCommand] = { log.debug("Ignoring received block as it doesn't match local state or fetch side is not on top") val newState = state.withPossibleNewTopAt(block.number) supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) fetchBlocks(newState) } - private def handleNewCheckpointBlockNotOnTop(block: Block, peerId: PeerId, state: BlockFetcherState): Unit = { - log.debug("Got checkpoint block") - val blockHash = block.hash - state.tryInsertBlock(block, peerId) match { - case Left(_) if block.number <= state.lastBlock => - log.debug( - "Checkpoint block {} is older than current last block {} - clearing the queues and putting checkpoint to ready blocks queue", - ByteStringUtils.hash2string(blockHash), - state.lastBlock - ) - val newState = state - .clearQueues() - .enqueueReadyBlock(block, peerId) - fetchBlocks(newState) - case Left(_) if block.number <= state.knownTop => - log.debug( - s"Checkpoint block ${ByteStringUtils.hash2string(blockHash)} not fit into queues - clearing the queues and setting new top" - ) - val newState = state - .clearQueues() - .withPeerForBlocks(peerId, Seq(block.number)) - .withKnownTopAt(block.number) - fetchBlocks(newState) - case Left(error) => - log.debug(error) - handleFutureBlock(block, state) - case Right(state) => - log.debug("Checkpoint block [{}] fit into queues", ByteStringUtils.hash2string(blockHash)) - fetchBlocks(state) - } - } - - private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = { - //by handling these type of messages, fetcher can receive from network fresh info about blocks on top - //ex. After a successful handshake, fetcher will receive the info about the header of the peer best block - case MessageFromPeer(BlockHeaders(headers), _) => - headers.lastOption.map { bh => - log.debug("Candidate for new top at block {}, current known top {}", bh.number, state.knownTop) - val newState = state.withPossibleNewTopAt(bh.number) - fetchBlocks(newState) - } - //keep fetcher state updated in case new mined block was imported - case InternalLastBlockImport(blockNr) => - log.debug("New mined block {} imported from the inside", blockNr) - val newState = state.withLastBlock(blockNr).withPossibleNewTopAt(blockNr) - - fetchBlocks(newState) - - //keep fetcher state updated in case new checkpoint block was imported - case InternalCheckpointImport(blockNr) => - log.debug("New checkpoint block {} imported from the inside", blockNr) - - val newState = state - .clearQueues() - .withLastBlock(blockNr) - .withPossibleNewTopAt(blockNr) - .withPausedFetching - - fetchBlocks(newState) - } - private def handlePickedBlocks( - state: BlockFetcherState + state: BlockFetcherState, + replyTo: ClassicActorRef )(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState = pickResult .tap { case (blocks, newState) => - sender() ! PickedBlocks(blocks) - newState.importer ! (if (newState.isOnTop) OnTop else NotOnTop) + replyTo ! PickedBlocks(blocks) + replyTo ! (if (newState.isOnTop) OnTop else NotOnTop) } .fold(state)(_._2) - private def fetchBlocks(state: BlockFetcherState): Unit = { - // Remember that tryFetchHeaders and tryFetchBodies can issue a request - // Nice and clean way to express that would be to use SyncIO from cats-effect - - if (state.pausedFetching) { - queue.offer(state) - context become started(state) - } else { - val newState = state |> tryFetchHeaders |> tryFetchBodies - context become started(newState) - } + private def fetchBlocks(state: BlockFetcherState): Behavior[FetchCommand] = { + val newState = state |> tryFetchHeaders |> tryFetchBodies + processFetchCommands(newState) } private def tryFetchHeaders(fetcherState: BlockFetcherState): BlockFetcherState = @@ -357,15 +279,7 @@ class BlockFetcher( private def fetchHeaders(state: BlockFetcherState): Unit = { val blockNr = state.nextBlockToFetch val amount = syncConfig.blockHeadersPerRequest - - fetchHeadersFrom(blockNr, amount).runToFuture pipeTo self - } - - private def fetchHeadersFrom(blockNr: BigInt, amount: Int): Task[Any] = { - log.debug("Fetching headers from block {}", blockNr) - val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false) - - requestBlockHeaders(msg) + headersFetcher ! HeadersFetcher.FetchHeaders(blockNr, amount) } private def tryFetchBodies(fetcherState: BlockFetcherState): BlockFetcherState = @@ -377,86 +291,32 @@ class BlockFetcher( .getOrElse(fetcherState) private def fetchBodies(state: BlockFetcherState): Unit = { - log.debug("Fetching bodies") - val hashes = state.takeHashes(syncConfig.blockBodiesPerRequest) - requestBlockBodies(hashes).runToFuture pipeTo self - } - - private def fetchStateNode(hash: ByteString, originalSender: ActorRef, state: BlockFetcherState): Unit = { - log.debug("Fetching state node for hash {}", ByteStringUtils.hash2string(hash)) - requestStateNode(hash).runToFuture pipeTo self - val newState = state.fetchingStateNode(hash, originalSender) - - context become started(newState) - } - - private def requestBlockHeaders(msg: GetBlockHeaders): Task[Any] = - makeRequest(Request.create(msg, BestPeer), RetryHeadersRequest) - .flatMap { - case Response(_, BlockHeaders(headers)) if headers.isEmpty => - log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval) - Task.now(RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval) - case res => Task.now(res) - } - - private def requestBlockBodies(hashes: Seq[ByteString]): Task[Any] = - makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), RetryBodiesRequest) - - private def requestStateNode(hash: ByteString): Task[Any] = - makeRequest(Request.create(GetNodeData(List(hash)), BestPeer), RetryFetchStateNode) - - private def makeRequest(request: Request[_], responseFallback: FetchMsg): Task[Any] = - Task - .deferFuture(peersClient ? request) - .tap(blacklistPeerOnFailedRequest) - .flatMap(handleRequestResult(responseFallback)) - .onErrorHandle { error => - log.error(error, "Unexpected error while doing a request") - responseFallback - } - - private def blacklistPeerOnFailedRequest(msg: Any): Unit = msg match { - case RequestFailed(peer, reason) => peersClient ! BlacklistPeer(peer.id, reason) - case _ => () - } - - private def handleRequestResult(fallback: FetchMsg)(msg: Any): Task[Any] = msg match { - case failed: RequestFailed => - log.warning("Request failed due to {}", failed) - Task.now(fallback) - - case NoSuitablePeer => - Task.now(fallback).delayExecution(syncConfig.syncRetryInterval) - - case Failure(cause) => - log.error(cause, "Unexpected error on the request result") - Task.now(fallback) - - case m => - Task.now(m) + bodiesFetcher ! BodiesFetcher.FetchBodies(hashes) } } object BlockFetcher { - - def props( - peersClient: ActorRef, - peerEventBus: ActorRef, - supervisor: ActorRef, + def apply( + peersClient: ClassicActorRef, + peerEventBus: ClassicActorRef, + supervisor: ClassicActorRef, syncConfig: SyncConfig, blockValidator: BlockValidator - ): Props = - Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator)) - - sealed trait FetchMsg - final case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg - final case class FetchStateNode(hash: ByteString) extends FetchMsg - final case object RetryFetchStateNode extends FetchMsg - final case class PickBlocks(amount: Int) extends FetchMsg - final case class StrictPickBlocks(from: BigInt, atLEastWith: BigInt) extends FetchMsg - final case object PrintStatus extends FetchMsg - final case class InvalidateBlocksFrom(fromBlock: BigInt, reason: String, toBlacklist: Option[BigInt]) extends FetchMsg + ): Behavior[FetchCommand] = + Behaviors.setup(context => + new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, context) + ) + + sealed trait FetchCommand + final case class Start(importer: ClassicActorRef, fromBlock: BigInt) extends FetchCommand + final case class FetchStateNode(hash: ByteString, replyTo: ClassicActorRef) extends FetchCommand + final case object RetryFetchStateNode extends FetchCommand + final case class PickBlocks(amount: Int, replyTo: ClassicActorRef) extends FetchCommand + final case class StrictPickBlocks(from: BigInt, atLEastWith: BigInt, replyTo: ClassicActorRef) extends FetchCommand + final case object PrintStatus extends FetchCommand + final case class InvalidateBlocksFrom(fromBlock: BigInt, reason: String, toBlacklist: Option[BigInt]) + extends FetchCommand object InvalidateBlocksFrom { @@ -466,11 +326,13 @@ object BlockFetcher { def apply(from: BigInt, reason: String, toBlacklist: Option[BigInt]): InvalidateBlocksFrom = new InvalidateBlocksFrom(from, reason, toBlacklist) } - final case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchMsg - final case class InternalLastBlockImport(blockNr: BigInt) extends FetchMsg - final case class InternalCheckpointImport(blockNr: BigInt) extends FetchMsg - final case object RetryBodiesRequest extends FetchMsg - final case object RetryHeadersRequest extends FetchMsg + final case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchCommand + final case class InternalLastBlockImport(blockNr: BigInt) extends FetchCommand + final case object RetryBodiesRequest extends FetchCommand + final case object RetryHeadersRequest extends FetchCommand + final case class AdaptedMessageFromEventBus(message: Message, peerId: PeerId) extends FetchCommand + final case class ReceivedHeaders(headers: Seq[BlockHeader]) extends FetchCommand + final case class ReceivedBodies(peer: Peer, bodies: Seq[BlockBody]) extends FetchCommand sealed trait FetchResponse final case class PickedBlocks(blocks: NonEmptyList[Block]) extends FetchResponse diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 6a662157a3..8eb1e72e4c 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -29,7 +29,6 @@ import scala.collection.immutable.Queue * - haven't fetched any yet * - are awaiting a response * - are awaiting a response but it should be ignored due to blocks being invalidated - * @param stateNodeFetcher * @param lastBlock * @param knownTop * @param blockProviders @@ -41,8 +40,6 @@ case class BlockFetcherState( waitingHeaders: Queue[BlockHeader], fetchingHeadersState: FetchingHeadersState, fetchingBodiesState: FetchingBodiesState, - pausedFetching: Boolean = false, - stateNodeFetcher: Option[StateNodeFetcher], lastBlock: BigInt, knownTop: BigInt, blockProviders: Map[BigInt, PeerId] @@ -50,8 +47,6 @@ case class BlockFetcherState( def isFetching: Boolean = isFetchingHeaders || isFetchingBodies - def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined - private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty def hasFetchedTopHeader: Boolean = nextBlockToFetch == knownTop + 1 @@ -88,31 +83,6 @@ case class BlockFetcherState( ) }) - def tryInsertBlock(block: Block, peerId: PeerId): Either[String, BlockFetcherState] = { - val blockHash = block.hash - if (isExist(blockHash)) { - Right(this) - } else if (isExistInReadyBlocks(block.header.parentHash)) { - val newState = clearQueues() - .copy( - readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block) - ) - .withPeerForBlocks(peerId, Seq(block.number)) - .withKnownTopAt(block.number) - Right(newState) - } else if (isExistInWaitingHeaders(block.header.parentHash)) { - // ignore already requested bodies - val newFetchingBodiesState = - if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState - val newState = copy( - waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header), - fetchingBodiesState = newFetchingBodiesState - ) - .withKnownTopAt(block.number) - Right(newState) - } else Left(s"Cannot insert block [${ByteStringUtils.hash2string(blockHash)}] into the queues") - } - /** * Validates received headers consistency and their compatibility with the state * TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled @@ -235,7 +205,9 @@ case class BlockFetcherState( .filter(_.headOption.exists(block => block.number <= lower)) .filter(_.lastOption.exists(block => block.number >= upper)) .filter(_.nonEmpty) - .map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue()))) + .map(blocks => + (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue(), lastBlock = blocks.last.number)) + ) } def clearQueues(): BlockFetcherState = { @@ -267,11 +239,11 @@ case class BlockFetcherState( ) } - def isExist(hash: ByteString): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash) + def exists(hash: ByteString): Boolean = existsInReadyBlocks(hash) || existsInWaitingHeaders(hash) - def isExistInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash) + private def existsInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash) - def isExistInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash) + private def existsInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash) def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr) @@ -296,14 +268,6 @@ case class BlockFetcherState( def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies) def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies) - def withPausedFetching: BlockFetcherState = copy(pausedFetching = true) - def withResumedFetching: BlockFetcherState = copy(pausedFetching = false) - - def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState = - copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor))) - - def notFetchingStateNode(): BlockFetcherState = copy(stateNodeFetcher = None) - def status: Map[String, Any] = Map( "ready blocks" -> readyBlocks.size, "known top" -> knownTop, @@ -314,7 +278,6 @@ case class BlockFetcherState( "fetched headers" -> waitingHeaders.size, "fetching headers" -> isFetchingHeaders, "fetching bodies" -> isFetchingBodies, - "fetching state node" -> isFetchingStateNode, "fetched top header" -> hasFetchedTopHeader, "first header" -> waitingHeaders.headOption.map(_.number), "first block" -> readyBlocks.headOption.map(_.number), @@ -333,7 +296,6 @@ object BlockFetcherState { waitingHeaders = Queue(), fetchingHeadersState = NotFetchingHeaders, fetchingBodiesState = NotFetchingBodies, - stateNodeFetcher = None, lastBlock = lastBlock, knownTop = lastBlock + 1, blockProviders = Map() diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala index 529bbd6958..bf06da6376 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala @@ -129,9 +129,9 @@ class BlockImporter( private def pickBlocks(state: ImporterState): Unit = { val msg = - state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(from => - BlockFetcher.StrictPickBlocks(from, startingBlockNumber) - ) + state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand]( + BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self) + )(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self)) fetcher ! msg } @@ -172,7 +172,7 @@ class BlockImporter( err match { case e: MissingNodeException => - fetcher ! BlockFetcher.FetchStateNode(e.hash) + fetcher ! BlockFetcher.FetchStateNode(e.hash, self) ResolvingMissingNode(NonEmptyList(notImportedBlocks.head, notImportedBlocks.tail)) case _ => val invalidBlockNr = notImportedBlocks.head.number @@ -189,7 +189,7 @@ class BlockImporter( if (blocks.isEmpty) { importedBlocks.headOption match { case Some(block) => - supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally = false) + supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false) case None => () } @@ -243,7 +243,7 @@ class BlockImporter( val (blocks, weights) = importedBlocksData.map(data => (data.block, data.weight)).unzip broadcastBlocks(blocks, weights) updateTxPool(importedBlocksData.map(_.block), Seq.empty) - supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally) + supervisor ! ProgressProtocol.ImportedBlock(block.number, internally) case BlockEnqueued => () case DuplicateBlock => () case UnknownParent => () // This is normal when receiving broadcast blocks @@ -252,7 +252,7 @@ class BlockImporter( broadcastBlocks(newBranch, weights) newBranch.lastOption match { case Some(newBlock) => - supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally) + supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally) case None => () } case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes => diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BodiesFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BodiesFetcher.scala new file mode 100644 index 0000000000..028ba8a177 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BodiesFetcher.scala @@ -0,0 +1,72 @@ +package io.iohk.ethereum.blockchain.sync.regular + +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.actor.{ActorRef => ClassicActorRef} +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request} +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand +import io.iohk.ethereum.blockchain.sync.regular.BodiesFetcher.BodiesFetcherCommand +import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.p2p.Message +import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, GetBlockBodies} +import io.iohk.ethereum.utils.Config.SyncConfig +import monix.execution.Scheduler + +import scala.util.{Failure, Success} + +class BodiesFetcher( + val peersClient: ClassicActorRef, + val syncConfig: SyncConfig, + val supervisor: ActorRef[FetchCommand], + context: ActorContext[BodiesFetcher.BodiesFetcherCommand] +) extends AbstractBehavior[BodiesFetcher.BodiesFetcherCommand](context) + with FetchRequest[BodiesFetcherCommand] { + + val log = context.log + implicit val ec: Scheduler = Scheduler(context.executionContext) + + import BodiesFetcher._ + + override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): BodiesFetcherCommand = AdaptedMessage(peer, msg) + + override def onMessage(message: BodiesFetcherCommand): Behavior[BodiesFetcherCommand] = { + message match { + case FetchBodies(hashes) => + log.debug("Start fetching bodies") + requestBodies(hashes) + Behaviors.same + case AdaptedMessage(peer, BlockBodies(bodies)) => + log.debug(s"Received ${bodies.size} block bodies") + supervisor ! BlockFetcher.ReceivedBodies(peer, bodies) + Behaviors.same + case BodiesFetcher.RetryBodiesRequest => + supervisor ! BlockFetcher.RetryBodiesRequest + Behaviors.same + case _ => Behaviors.unhandled + } + } + + private def requestBodies(hashes: Seq[ByteString]): Unit = { + val resp = makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), BodiesFetcher.RetryBodiesRequest) + context.pipeToSelf(resp.runToFuture) { + case Success(res) => res + case Failure(_) => BodiesFetcher.RetryBodiesRequest + } + } +} + +object BodiesFetcher { + + def apply( + peersClient: ClassicActorRef, + syncConfig: SyncConfig, + supervisor: ActorRef[FetchCommand] + ): Behavior[BodiesFetcherCommand] = + Behaviors.setup(context => new BodiesFetcher(peersClient, syncConfig, supervisor, context)) + + sealed trait BodiesFetcherCommand + final case class FetchBodies(hashes: Seq[ByteString]) extends BodiesFetcherCommand + final case object RetryBodiesRequest extends BodiesFetcherCommand + private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends BodiesFetcherCommand +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetchRequest.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetchRequest.scala new file mode 100644 index 0000000000..596c32445c --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetchRequest.scala @@ -0,0 +1,56 @@ +package io.iohk.ethereum.blockchain.sync.regular + +import akka.actor.ActorRef +import io.iohk.ethereum.blockchain.sync.PeersClient +import io.iohk.ethereum.blockchain.sync.PeersClient.{BlacklistPeer, NoSuitablePeer, Request, RequestFailed} +import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.p2p.Message +import io.iohk.ethereum.utils.Config.SyncConfig +import monix.eval.Task +import org.slf4j.Logger +import akka.pattern.ask +import akka.util.Timeout +import io.iohk.ethereum.utils.FunctorOps._ + +import scala.concurrent.duration._ +import scala.util.Failure + +trait FetchRequest[A] { + val peersClient: ActorRef + val syncConfig: SyncConfig + val log: Logger + + def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): A + + implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication + + def makeRequest(request: Request[_], responseFallback: A): Task[A] = + Task + .deferFuture(peersClient ? request) + .tap(blacklistPeerOnFailedRequest) + .flatMap(handleRequestResult(responseFallback)) + .onErrorHandle { error => + log.error("Unexpected error while doing a request", error) + responseFallback + } + + def blacklistPeerOnFailedRequest(msg: Any): Unit = msg match { + case RequestFailed(peer, reason) => peersClient ! BlacklistPeer(peer.id, reason) + case _ => () + } + + def handleRequestResult(fallback: A)(msg: Any): Task[A] = { + msg match { + case failed: RequestFailed => + log.debug("Request failed due to {}", failed) + Task.now(fallback) + case NoSuitablePeer => + Task.now(fallback).delayExecution(syncConfig.syncRetryInterval) + case Failure(cause) => + log.error("Unexpected error on the request result", cause) + Task.now(fallback) + case PeersClient.Response(peer, msg) => + Task.now(makeAdaptedMessage(peer, msg)) + } + } +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala new file mode 100644 index 0000000000..5c8b6796b9 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/HeadersFetcher.scala @@ -0,0 +1,81 @@ +package io.iohk.ethereum.blockchain.sync.regular +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.actor.{ActorRef => ClassicActorRef} +import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request} +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand +import io.iohk.ethereum.blockchain.sync.regular.HeadersFetcher.HeadersFetcherCommand +import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.p2p.Message +import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders} +import io.iohk.ethereum.utils.Config.SyncConfig +import monix.eval.Task +import monix.execution.Scheduler +import org.slf4j.Logger + +import scala.util.{Failure, Success} + +class HeadersFetcher( + val peersClient: ClassicActorRef, + val syncConfig: SyncConfig, + val supervisor: ActorRef[FetchCommand], + context: ActorContext[HeadersFetcher.HeadersFetcherCommand] +) extends AbstractBehavior[HeadersFetcher.HeadersFetcherCommand](context) + with FetchRequest[HeadersFetcherCommand] { + + val log: Logger = context.log + implicit val ec: Scheduler = Scheduler(context.executionContext) + + import HeadersFetcher._ + + override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): HeadersFetcherCommand = AdaptedMessage(peer, msg) + + override def onMessage(message: HeadersFetcherCommand): Behavior[HeadersFetcherCommand] = + message match { + case FetchHeaders(blockNumber: BigInt, amount: BigInt) => + log.debug("Start fetching headers from block {}", blockNumber) + requestHeaders(blockNumber, amount) + Behaviors.same + case AdaptedMessage(_, BlockHeaders(headers)) => + log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number)) + supervisor ! BlockFetcher.ReceivedHeaders(headers) + Behaviors.same + case HeadersFetcher.RetryHeadersRequest => + supervisor ! BlockFetcher.RetryHeadersRequest + Behaviors.same + case _ => Behaviors.unhandled + } + + private def requestHeaders(blockNr: BigInt, amount: BigInt): Unit = { + log.debug("Fetching headers from block {}", blockNr) + val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false) + + val resp = makeRequest(Request.create(msg, BestPeer), HeadersFetcher.RetryHeadersRequest) + .flatMap { + case AdaptedMessage(_, BlockHeaders(headers)) if headers.isEmpty => + log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval) + Task.now(HeadersFetcher.RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval) + case res => Task.now(res) + } + + context.pipeToSelf(resp.runToFuture) { + case Success(res) => res + case Failure(_) => HeadersFetcher.RetryHeadersRequest + } + } +} + +object HeadersFetcher { + + def apply( + peersClient: ClassicActorRef, + syncConfig: SyncConfig, + supervisor: ActorRef[FetchCommand] + ): Behavior[HeadersFetcherCommand] = + Behaviors.setup(context => new HeadersFetcher(peersClient, syncConfig, supervisor, context)) + + sealed trait HeadersFetcherCommand + final case class FetchHeaders(blockNumber: BigInt, amount: BigInt) extends HeadersFetcherCommand + final case object RetryHeadersRequest extends HeadersFetcherCommand + private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends HeadersFetcherCommand +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala index 277d126882..242aede81d 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala @@ -1,16 +1,18 @@ package io.iohk.ethereum.blockchain.sync.regular import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy} +import akka.actor.typed.{ActorRef => TypedActorRef} import io.iohk.ethereum.blockchain.sync.SyncProtocol import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress -import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{InternalCheckpointImport, InternalLastBlockImport} import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState} import io.iohk.ethereum.consensus.validators.BlockValidator import io.iohk.ethereum.domain.{Block, Blockchain} import io.iohk.ethereum.ledger.Ledger import io.iohk.ethereum.utils.ByteStringUtils import io.iohk.ethereum.utils.Config.SyncConfig +import akka.actor.typed.scaladsl.adapter._ +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport class RegularSync( peersClient: ActorRef, @@ -26,11 +28,14 @@ class RegularSync( ) extends Actor with ActorLogging { - val fetcher: ActorRef = - context.actorOf( - BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, blockValidator), + val fetcher: TypedActorRef[BlockFetcher.FetchCommand] = + context.spawn( + BlockFetcher(peersClient, peerEventBus, self, syncConfig, blockValidator), "block-fetcher" ) + + context.watch(fetcher) + val broadcaster: ActorRef = context.actorOf( BlockBroadcasterActor .props(new BlockBroadcast(etcPeerManager), peerEventBus, etcPeerManager, syncConfig, scheduler), @@ -39,7 +44,7 @@ class RegularSync( val importer: ActorRef = context.actorOf( BlockImporter.props( - fetcher, + fetcher.toClassic, ledger, blockchain, syncConfig, @@ -55,7 +60,7 @@ class RegularSync( scheduler.scheduleWithFixedDelay( syncConfig.printStatusInterval, syncConfig.printStatusInterval, - fetcher, + fetcher.toClassic, BlockFetcher.PrintStatus )(context.dispatcher) val printImporterSchedule: Cancellable = @@ -95,12 +100,10 @@ class RegularSync( log.info(s"Got information about new block [number = $blockNumber]") val newState = progressState.copy(bestKnownNetworkBlock = blockNumber) context become running(newState) - case ProgressProtocol.ImportedBlock(blockNumber, isCheckpoint, internally) => + case ProgressProtocol.ImportedBlock(blockNumber, internally) => log.info(s"Imported new block [number = $blockNumber, internally = $internally]") val newState = progressState.copy(currentBlock = blockNumber) - if (internally && isCheckpoint) { - fetcher ! InternalCheckpointImport(blockNumber) - } else if (internally) { + if (internally) { fetcher ! InternalLastBlockImport(blockNumber) } context become running(newState) @@ -166,6 +169,6 @@ object RegularSync { case object StartedFetching extends ProgressProtocol case class StartingFrom(blockNumber: BigInt) extends ProgressProtocol case class GotNewBlock(blockNumber: BigInt) extends ProgressProtocol - case class ImportedBlock(blockNumber: BigInt, isCheckpoint: Boolean, internally: Boolean) extends ProgressProtocol + case class ImportedBlock(blockNumber: BigInt, internally: Boolean) extends ProgressProtocol } } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/StateNodeFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/StateNodeFetcher.scala new file mode 100644 index 0000000000..18f1ab6046 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/StateNodeFetcher.scala @@ -0,0 +1,104 @@ +package io.iohk.ethereum.blockchain.sync.regular + +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.{ActorRef => ClassicActorRef} +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.PeersClient._ +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{FetchCommand, FetchedStateNode} +import io.iohk.ethereum.crypto.kec256 +import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.p2p.Message +import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData} +import io.iohk.ethereum.utils.Config.SyncConfig +import cats.syntax.either._ +import monix.execution.Scheduler + +import scala.util.{Failure, Success} + +class StateNodeFetcher( + val peersClient: ClassicActorRef, + val syncConfig: SyncConfig, + val supervisor: ActorRef[FetchCommand], + context: ActorContext[StateNodeFetcher.StateNodeFetcherCommand] +) extends AbstractBehavior[StateNodeFetcher.StateNodeFetcherCommand](context) + with FetchRequest[StateNodeFetcher.StateNodeFetcherCommand] { + + val log = context.log + implicit val ec: Scheduler = Scheduler(context.executionContext) + + import StateNodeFetcher._ + + override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): StateNodeFetcherCommand = AdaptedMessage(peer, msg) + + private var requester: Option[StateNodeRequester] = None + + override def onMessage(message: StateNodeFetcherCommand): Behavior[StateNodeFetcherCommand] = { + message match { + case StateNodeFetcher.FetchStateNode(hash, sender) => + log.debug("Start fetching state node") + requestStateNode(hash) + requester = Some(StateNodeRequester(hash, sender)) + Behaviors.same + case AdaptedMessage(peer, NodeData(values)) if requester.isDefined => + log.debug("Received state node response from peer {}", peer) + + requester + .collect(stateNodeRequester => { + val validatedNode = values + .asRight[String] + .ensure(s"Empty response from peer $peer, blacklisting")(_.nonEmpty) + .ensure("Fetched node state hash doesn't match requested one, blacklisting peer")(nodes => + stateNodeRequester.hash == kec256(nodes.head) + ) + + validatedNode match { + case Left(err) => + log.debug(err) + peersClient ! BlacklistPeer(peer.id, err) + context.self ! StateNodeFetcher.FetchStateNode(stateNodeRequester.hash, stateNodeRequester.replyTo) + Behaviors.same[StateNodeFetcherCommand] + case Right(node) => + stateNodeRequester.replyTo ! FetchedStateNode(NodeData(node)) + requester = None + Behaviors.same[StateNodeFetcherCommand] + } + }) + .getOrElse(Behaviors.same) + + case StateNodeFetcher.RetryStateNodeRequest if requester.isDefined => + log.debug("Something failed on a state node request, trying again") + requester + .collect(stateNodeRequester => + context.self ! StateNodeFetcher.FetchStateNode(stateNodeRequester.hash, stateNodeRequester.replyTo) + ) + Behaviors.same + case _ => Behaviors.unhandled + } + } + + private def requestStateNode(hash: ByteString): Unit = { + val resp = makeRequest(Request.create(GetNodeData(List(hash)), BestPeer), StateNodeFetcher.RetryStateNodeRequest) + context.pipeToSelf(resp.runToFuture) { + case Success(res) => res + case Failure(_) => StateNodeFetcher.RetryStateNodeRequest + } + } +} + +object StateNodeFetcher { + + def apply( + peersClient: ClassicActorRef, + syncConfig: SyncConfig, + supervisor: ActorRef[FetchCommand] + ): Behavior[StateNodeFetcherCommand] = + Behaviors.setup(context => new StateNodeFetcher(peersClient, syncConfig, supervisor, context)) + + sealed trait StateNodeFetcherCommand + final case class FetchStateNode(hash: ByteString, originalSender: ClassicActorRef) extends StateNodeFetcherCommand + final case object RetryStateNodeRequest extends StateNodeFetcherCommand + private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends StateNodeFetcherCommand + + final case class StateNodeRequester(hash: ByteString, replyTo: ClassicActorRef) +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index c3fc3117f7..3a436f0c11 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -1,40 +1,40 @@ package io.iohk.ethereum.blockchain.sync.regular +import java.net.InetSocketAddress + import akka.actor.ActorSystem -import akka.testkit.{TestKit, TestProbe} -import cats.data.NonEmptyList +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.scaladsl.adapter._ +import akka.testkit.TestProbe import com.miguno.akka.testing.VirtualTime import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.Mocks.{MockValidatorsAlwaysSucceed, MockValidatorsFailingOnBlockBodies} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer -import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{InternalLastBlockImport, InvalidateBlocksFrom, PickBlocks} +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{AdaptedMessageFromEventBus, InternalLastBlockImport, InvalidateBlocksFrom, PickBlocks} import io.iohk.ethereum.blockchain.sync.{PeersClient, TestSyncConfig} -import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers -import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator -import io.iohk.ethereum.domain.{Block, ChainWeight, Checkpoint, HeadersSeq} -import io.iohk.ethereum.network.{Peer, PeerId} -import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer +import io.iohk.ethereum.domain.{Block, HeadersSeq} import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe} +import io.iohk.ethereum.network.p2p.messages.Codes import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock import io.iohk.ethereum.network.p2p.messages.PV62._ -import io.iohk.ethereum.network.p2p.messages.{Codes, PV64} +import io.iohk.ethereum.network.{Peer, PeerId} import io.iohk.ethereum.security.SecureRandomBuilder -import io.iohk.ethereum.{BlockHelpers, Timeouts, WithActorSystemShutDown, crypto} +import io.iohk.ethereum.{BlockHelpers, Timeouts} import org.scalatest.freespec.AnyFreeSpecLike import org.scalatest.matchers.should.Matchers -import java.net.InetSocketAddress import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ class BlockFetcherSpec - extends TestKit(ActorSystem("BlockFetcherSpec_System")) + extends ScalaTestWithActorTestKit() with AnyFreeSpecLike - with WithActorSystemShutDown with Matchers with SecureRandomBuilder { + val as: ActorSystem = ActorSystem("BlockFetcherSpec_System") + "BlockFetcher" - { "should not requests headers upon invalidation while a request is already in progress, should resume after response" in new TestSetup { @@ -53,7 +53,7 @@ class BlockFetcherSpec ) // Save the reference to respond to the ask pattern on fetcher val refExpectingReply = peersClient.expectMsgPF() { - case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => peersClient.lastSender + case PeersClient.Request(`secondGetBlockHeadersRequest`, _, _) => peersClient.lastSender } // Mark first blocks as invalid, no further request should be done @@ -116,7 +116,7 @@ class BlockFetcherSpec peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockBodiesRequest => () } // Fetcher should not enqueue any new block - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) + importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectNoMessage(100.millis) } @@ -128,7 +128,7 @@ class BlockFetcherSpec handleFirstBlockBatchHeaders() val getBlockBodiesRequest1 = GetBlockBodies(firstBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + peersClient.fishForMessage() { case PeersClient.Request(`getBlockBodiesRequest1`, _, _) => true} // It will receive all the requested bodies, but splitted in 2 parts. val (subChain1, subChain2) = firstBlocksBatch.splitAt(syncConfig.blockBodiesPerRequest / 2) @@ -137,15 +137,15 @@ class BlockFetcherSpec peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) val getBlockBodiesRequest2 = GetBlockBodies(subChain2.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest2 => () } + peersClient.fishForSpecificMessage() { case PeersClient.Request(`getBlockBodiesRequest2`, _, _) => true} val getBlockBodiesResponse2 = BlockBodies(subChain2.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) // We need to wait a while in order to allow fetcher to process all the blocks - system.scheduler.scheduleOnce(Timeouts.shortTimeout) { + as.scheduler.scheduleOnce(Timeouts.shortTimeout) { // Fetcher should enqueue all the received blocks - importer.send(blockFetcher, PickBlocks(firstBlocksBatch.size)) + importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref)) } importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) @@ -161,7 +161,7 @@ class BlockFetcherSpec handleFirstBlockBatchHeaders() val getBlockBodiesRequest1 = GetBlockBodies(firstBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + peersClient.expectMsgPF() { case PeersClient.Request(`getBlockBodiesRequest1`, _, _) => () } // It will receive part of the requested bodies. val (subChain1, subChain2) = firstBlocksBatch.splitAt(syncConfig.blockBodiesPerRequest / 2) @@ -170,14 +170,14 @@ class BlockFetcherSpec peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) val getBlockBodiesRequest2 = GetBlockBodies(subChain2.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest2 => () } + peersClient.expectMsgPF() { case PeersClient.Request(`getBlockBodiesRequest2`, _, _) => () } // We receive empty bodies instead of the second part val getBlockBodiesResponse2 = BlockBodies(List()) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) // If we try to pick the whole chain we should only receive the first part - importer.send(blockFetcher, PickBlocks(firstBlocksBatch.size)) + importer.send(blockFetcher.toClassic, PickBlocks(firstBlocksBatch.size, importer.ref)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash) @@ -238,7 +238,7 @@ class BlockFetcherSpec PeersClient.Response(fakePeer, BlockBodies(alternativeSecondBlocksBatch.drop(6).map(_.body))) ) - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) + importer.send(blockFetcher.toClassic, PickBlocks(syncConfig.blocksBatchSize, importer.ref)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => val headers = blocks.map(_.header).toList @@ -247,241 +247,6 @@ class BlockFetcherSpec } } - "should process checkpoint blocks when checkpoint can fit into ready blocks queue" in new TestSetup { - startFetcher() - - triggerFetching(20) - - val secondBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, firstBlocksBatch.last) - val checkpointBlock = (new CheckpointBlockGenerator) - .generate( - firstBlocksBatch.last, - Checkpoint( - CheckpointingTestHelpers.createCheckpointSignatures( - Seq(crypto.generateKeyPair(secureRandom)), - firstBlocksBatch.last.hash - ) - ) - ) - - handleFirstBlockBatchHeaders() - - // Fetcher second request for headers - val secondGetBlockHeadersRequest = - GetBlockHeaders( - Left(secondBlocksBatch.head.number), - syncConfig.blockHeadersPerRequest, - skip = 0, - reverse = false - ) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => () } - - // Respond second headers request - val secondGetBlockHeadersResponse = BlockHeaders(secondBlocksBatch.map(_.header)) - peersClient.reply(PeersClient.Response(fakePeer, secondGetBlockHeadersResponse)) - - handleFirstBlockBatchBodies() - - // Second bodies request - val secondGetBlockBodiesRequest = GetBlockBodies(secondBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockBodiesRequest => () } - - // Second bodies response - val secondGetBlockBodiesResponse = BlockBodies(secondBlocksBatch.map(_.body)) - peersClient.reply(PeersClient.Response(fakePeer, secondGetBlockBodiesResponse)) - - // send old checkpoint block - blockFetcher ! MessageFromPeer( - PV64.NewBlock(checkpointBlock, ChainWeight(checkpointBlock.number, checkpointBlock.header.difficulty)), - fakePeer.id - ) - - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize * 2)) - importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => - val headers = blocks.map(_.header).toList - - assert(HeadersSeq.areChain(headers)) - assert(headers.lastOption.contains(checkpointBlock.header)) - } - } - - "should process checkpoint blocks when checkpoint can fit into waiting headers queue" in new TestSetup { - startFetcher() - - triggerFetching(20) - - val secondBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, firstBlocksBatch.last) - val secondBlocksBatchFirstPart = secondBlocksBatch.splitAt(5)._1 - val checkpointBlock = (new CheckpointBlockGenerator) - .generate( - secondBlocksBatchFirstPart.last, - Checkpoint( - CheckpointingTestHelpers.createCheckpointSignatures( - Seq(crypto.generateKeyPair(secureRandom)), - secondBlocksBatchFirstPart.last.hash - ) - ) - ) - - val alternativeSecondBlocksBatch = secondBlocksBatchFirstPart :+ checkpointBlock - - handleFirstBlockBatchHeaders() - - // Fetcher second request for headers - val secondGetBlockHeadersRequest = - GetBlockHeaders( - Left(secondBlocksBatch.head.number), - syncConfig.blockHeadersPerRequest, - skip = 0, - reverse = false - ) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockHeadersRequest => () } - - // Respond second headers request - val secondGetBlockHeadersResponse = BlockHeaders(secondBlocksBatch.map(_.header)) - peersClient.reply(PeersClient.Response(fakePeer, secondGetBlockHeadersResponse)) - - handleFirstBlockBatchBodies() - - // second bodies request - val secondGetBlockBodiesRequest = GetBlockBodies(secondBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockBodiesRequest => () } - - // send old checkpoint block - blockFetcher ! MessageFromPeer( - PV64.NewBlock(checkpointBlock, ChainWeight(checkpointBlock.number, checkpointBlock.header.difficulty)), - fakePeer.id - ) - - // second bodies response - val secondGetBlockBodiesResponse = BlockBodies(secondBlocksBatch.map(_.body)) - peersClient.reply(PeersClient.Response(fakePeer, secondGetBlockBodiesResponse)) - - // third bodies request after adding checkpoint into the waiting headers queue - val thirdGetBlockBodiesRequest = GetBlockBodies(alternativeSecondBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == thirdGetBlockBodiesRequest => () } - - // third bodies response - val thirdGetBlockBodiesResponse = BlockBodies(alternativeSecondBlocksBatch.map(_.body)) - peersClient.reply(PeersClient.Response(fakePeer, thirdGetBlockBodiesResponse)) - - // We need to wait a while in order to allow fetcher to process all the blocks - system.scheduler.scheduleOnce(Timeouts.shortTimeout) { - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize * 2)) - } - - importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => - val headers = blocks.map(_.header).toList - - assert(HeadersSeq.areChain(headers)) - assert(headers.contains(checkpointBlock.header)) - } - } - - "should process checkpoint blocks when checkpoint not fit into queues" in new TestSetup { - startFetcher() - - triggerFetching(10) - - val alternativeBlocksBatch = - BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest / 2, FixtureBlocks.Genesis.block) - val checkpointBlock = (new CheckpointBlockGenerator) - .generate( - alternativeBlocksBatch.last, - Checkpoint( - CheckpointingTestHelpers.createCheckpointSignatures( - Seq(crypto.generateKeyPair(secureRandom)), - alternativeBlocksBatch.last.hash - ) - ) - ) - - val alternativeBlocksBatchWithCheckpoint = alternativeBlocksBatch :+ checkpointBlock - - handleFirstBlockBatch() - - // send checkpoint block - blockFetcher ! MessageFromPeer( - PV64.NewBlock(checkpointBlock, ChainWeight(checkpointBlock.number, checkpointBlock.header.difficulty)), - fakePeer.id - ) - - // Fetcher new request for headers - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } - - // Respond first headers request - val newGetBlockHeadersResponse = BlockHeaders(alternativeBlocksBatchWithCheckpoint.map(_.header)) - peersClient.reply(PeersClient.Response(fakePeer, newGetBlockHeadersResponse)) - - // new bodies request - val newGetBlockBodiesRequest = GetBlockBodies(alternativeBlocksBatchWithCheckpoint.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == newGetBlockBodiesRequest => () } - - // First bodies response - val newGetBlockBodiesResponse = BlockBodies(alternativeBlocksBatchWithCheckpoint.map(_.body)) - peersClient.reply(PeersClient.Response(fakePeer, newGetBlockBodiesResponse)) - - // We need to wait a while in order to allow fetcher to process all the blocks - system.scheduler.scheduleOnce(Timeouts.shortTimeout) { - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) - } - - importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => - val headers = blocks.map(_.header).toList - - assert(HeadersSeq.areChain(headers)) - assert(headers.contains(checkpointBlock.header)) - } - } - - "should put checkpoint to ready blocks when checkpoint block is older than last block" in new TestSetup { - startFetcher() - - triggerFetching(10) - - val alternativeBlocksBatch = - BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest / 2, FixtureBlocks.Genesis.block) - val checkpointBlock = (new CheckpointBlockGenerator) - .generate( - alternativeBlocksBatch.last, - Checkpoint( - CheckpointingTestHelpers.createCheckpointSignatures( - Seq(crypto.generateKeyPair(secureRandom)), - alternativeBlocksBatch.last.hash - ) - ) - ) - - handleFirstBlockBatch() - - // We need to wait a while in order to allow fetcher to process all the blocks - system.scheduler.scheduleOnce(Timeouts.shortTimeout) { - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) - } - - importer.expectMsg(BlockFetcher.PickedBlocks(NonEmptyList(firstBlocksBatch.head, firstBlocksBatch.tail))) - - // send old checkpoint block - blockFetcher ! MessageFromPeer( - PV64.NewBlock(checkpointBlock, ChainWeight(checkpointBlock.number, checkpointBlock.header.difficulty)), - fakePeer.id - ) - - importer.expectMsg(BlockImporter.OnTop) - - // We need to wait a while in order to allow fetcher to process all the blocks - system.scheduler.scheduleOnce(Timeouts.shortTimeout) { - importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) - } - - importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => - val headers = blocks.map(_.header).toList - - assert(HeadersSeq.areChain(headers)) - assert(headers.contains(checkpointBlock.header)) - } - } - "should properly handle a request timeout" in new TestSetup { override lazy val syncConfig = defaultSyncConfig.copy( // Small timeout on ask pattern for testing it here @@ -490,22 +255,22 @@ class BlockFetcherSpec startFetcher() - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } + peersClient.expectMsgPF() { case PeersClient.Request(`firstGetBlockHeadersRequest`, _, _) => () } // Request should timeout without any response from the peer Thread.sleep((syncConfig.peerResponseTimeout + 2.seconds).toMillis) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } + peersClient.expectMsgPF() { case PeersClient.Request(`firstGetBlockHeadersRequest`, _, _) => () } } } trait TestSetup extends TestSyncConfig { val time = new VirtualTime - val peersClient: TestProbe = TestProbe() - val peerEventBus: TestProbe = TestProbe() - val importer: TestProbe = TestProbe() - val regularSync: TestProbe = TestProbe() + val peersClient = TestProbe()(as) + val peerEventBus = TestProbe()(as) + val importer = TestProbe()(as) + val regularSync = TestProbe()(as) lazy val validators = new MockValidatorsAlwaysSucceed @@ -518,12 +283,11 @@ class BlockFetcherSpec peerResponseTimeout = 5.minutes ) - val fakePeerActor: TestProbe = TestProbe() + val fakePeerActor: TestProbe = TestProbe()(as) val fakePeer = Peer(PeerId("fakePeer"), new InetSocketAddress("127.0.0.1", 9000), fakePeerActor.ref, false) - lazy val blockFetcher = system.actorOf( - BlockFetcher - .props( + lazy val blockFetcher = spawn( + BlockFetcher( peersClient.ref, peerEventBus.ref, regularSync.ref, @@ -552,7 +316,7 @@ class BlockFetcherSpec val farAwayBlock = Block(FixtureBlocks.ValidBlock.header.copy(number = startingNumber), FixtureBlocks.ValidBlock.body) - blockFetcher ! MessageFromPeer(NewBlock(farAwayBlock, farAwayBlockTotalDifficulty), fakePeer.id) + blockFetcher ! AdaptedMessageFromEventBus(NewBlock(farAwayBlock, farAwayBlockTotalDifficulty), fakePeer.id) } val firstBlocksBatch = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block) @@ -562,7 +326,7 @@ class BlockFetcherSpec GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) def handleFirstBlockBatchHeaders() = { - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } + peersClient.expectMsgPF() { case PeersClient.Request(`firstGetBlockHeadersRequest`, _, _) => () } // Respond first headers request val firstGetBlockHeadersResponse = BlockHeaders(firstBlocksBatch.map(_.header)) @@ -572,7 +336,7 @@ class BlockFetcherSpec // First bodies request val firstGetBlockBodiesRequest = GetBlockBodies(firstBlocksBatch.map(_.hash)) def handleFirstBlockBatchBodies() = { - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockBodiesRequest => () } + peersClient.expectMsgPF() { case PeersClient.Request(`firstGetBlockBodiesRequest`, _, _) => () } // First bodies response val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body)) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala index 9fcb4549bd..7a85def5bb 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala @@ -72,65 +72,5 @@ class BlockFetcherStateSpec assert(result.map(_.waitingHeaders) === Left("Given headers should form a sequence with ready blocks")) } } - - "trying to insert block into the queues" should { - "insert block into the ready blocks queue" in { - val (front, _) = blocks.splitAt(2) - val testBlock = BlockHelpers.generateBlock(front.last) - val peerId = PeerId("bar") - - val result = BlockFetcherState - .initial(importer, validators.blockValidator, 0) - .appendHeaders(blocks.map(_.header)) - .map(_.handleRequestedBlocks(blocks, peer)) - .flatMap(_.tryInsertBlock(testBlock, peerId)) - - assert(result.map(_.waitingHeaders) === Right(Queue.empty)) - assert(result.map(_.readyBlocks) === Right(Queue.empty.enqueueAll(front :+ testBlock))) - front.foreach { block => - assert(result.map(_.blockProviders(block.number)) === Right(peer)) - } - assert(result.map(_.blockProviders(testBlock.number)) === Right(peerId)) - assert(result.map(_.knownTop) === Right(testBlock.number)) - } - - "insert block into the waiting headers queue" in { - val (front, _) = blocks.splitAt(2) - val testBlock = BlockHelpers.generateBlock(front.last) - - val result = BlockFetcherState - .initial(importer, validators.blockValidator, 0) - .appendHeaders(blocks.map(_.header)) - .flatMap(_.tryInsertBlock(testBlock, peer)) - - assert(result.map(_.readyBlocks) === Right(Queue.empty)) - assert(result.map(_.waitingHeaders) === Right(Queue.empty.enqueueAll((front :+ testBlock).map(_.header)))) - assert(result.map(_.knownTop) === Right(testBlock.number)) - } - - "return state without changes when block is already in the queues" in { - val (front, _) = blocks.splitAt(2) - val testBlock = front.last - - val initial = BlockFetcherState - .initial(importer, validators.blockValidator, 0) - .appendHeaders(blocks.map(_.header)) - - val result = initial.flatMap(_.tryInsertBlock(testBlock, peer)) - - assert(result === initial) - } - - "return error msg when cannot insert block" in { - val testBlock = BlockHelpers.generateBlock(BlockHelpers.genesis) - - val result = BlockFetcherState - .initial(importer, validators.blockValidator, 0) - .appendHeaders(blocks.map(_.header)) - .flatMap(_.tryInsertBlock(testBlock, peer)) - - assert(result === Left(s"Cannot insert block [${ByteStringUtils.hash2string(testBlock.hash)}] into the queues")) - } - } } }