From 3e6d236749e647a5a1518fb082db08fa35c66c34 Mon Sep 17 00:00:00 2001 From: mk Date: Tue, 3 Nov 2020 15:29:43 -0300 Subject: [PATCH 01/11] [ETCM-283] Add block bodies validation in block fetcher --- .../sync/regular/BlockFetcher.scala | 11 ++++-- .../sync/regular/BlockFetcherState.scala | 39 +++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 5be786a039..4186e03866 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -143,10 +143,15 @@ class BlockFetcher( log.debug("Received {} block bodies that will be ignored", bodies.size) state.withBodiesFetchReceived } else { - log.debug("Fetched {} block bodies", bodies.size) - state.withBodiesFetchReceived.addBodies(peer, bodies) + state.validateBodies(bodies) match { + case Left(err) => + peersClient ! BlacklistPeer(peer.id, err) + state.withBodiesFetchReceived + case Right(newBlocks) => + log.debug("Fetched {} block bodies", newBlocks.size) + state.withBodiesFetchReceived.appendNewBlocks(newBlocks, peer.id) + } } - fetchBlocks(newState) case RetryBodiesRequest if state.isFetchingBodies => log.debug("Time-out occurred while waiting for bodies") diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 134ffa816f..e6e46c89c2 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -3,14 +3,17 @@ package io.iohk.ethereum.blockchain.sync.regular import akka.actor.ActorRef import akka.util.ByteString import cats.data.NonEmptyList +//FIXME: By using this class, we are coupling sync process with a specific consensus (the standard one). +import io.iohk.ethereum.consensus.validators.std.StdBlockValidator import io.iohk.ethereum.domain.{Block, BlockHeader, BlockBody, HeadersSeq} -import io.iohk.ethereum.network.{Peer, PeerId} +import io.iohk.ethereum.network.PeerId import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash import BlockFetcherState._ import cats.syntax.either._ import cats.syntax.option._ import scala.collection.immutable.Queue +import scala.annotation.tailrec // scalastyle:off number.of.methods /** @@ -101,14 +104,36 @@ case class BlockFetcherState( } ) - def addBodies(peer: Peer, bodies: Seq[BlockBody]): BlockFetcherState = { - val (matching, waiting) = waitingHeaders.splitAt(bodies.length) - val blocks = matching.zip(bodies).map((Block.apply _).tupled) + def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] = + bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies) + .toRight( + "Received unrequested bodies" + ) + + // Checks that the received block bodies are an ordered subset of the ones requested + @tailrec + private def bodiesAreOrderedSubsetOfRequested( + requestedHeaders: Seq[BlockHeader], + respondedBodies: Seq[BlockBody], + matchedBlocks: Seq[Block] = Nil + ): Option[Seq[Block]] = + (requestedHeaders, respondedBodies) match { + case (Seq(), _ +: _) => None + case (_, Seq()) => Some(matchedBlocks) + case (header +: remainingHeaders, body +: remainingBodies) => + val doMatch = StdBlockValidator.validateHeaderAndBody(header, body).isRight + if (doMatch) + bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body)) + else + bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks) + } - withPeerForBlocks(peer.id, blocks.map(_.header.number)) + def appendNewBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState = { + val receivedHeaders = blocks.map(_.header) + withPeerForBlocks(fromPeer, blocks.map(_.header.number)) .copy( - readyBlocks = readyBlocks.enqueue(blocks), - waitingHeaders = waiting + readyBlocks = readyBlocks.enqueue(blocks.toList), + waitingHeaders = waitingHeaders.diff(receivedHeaders) ) } From 6a71cb3e7d9c1bd7853a8128c9d03e5586986eac Mon Sep 17 00:00:00 2001 From: mk Date: Wed, 4 Nov 2020 10:58:38 -0300 Subject: [PATCH 02/11] [ETCM-283] Pass block validator as param --- .../sync/util/RegularSyncItSpecUtils.scala | 3 ++ .../blockchain/sync/SyncController.scala | 1 + .../sync/regular/BlockFetcher.scala | 7 +++-- .../sync/regular/BlockFetcherState.scala | 31 ++++++++++--------- .../blockchain/sync/regular/RegularSync.scala | 9 +++++- .../sync/regular/BlockFetcherSpec.scala | 4 +++ .../sync/regular/BlockFetcherStateSpec.scala | 9 ++++-- .../sync/regular/RegularSyncFixtures.scala | 1 + 8 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala index 981b3b0e66..a84ecbc079 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala @@ -63,6 +63,8 @@ object RegularSyncItSpecUtils { "pending-transactions-manager" ) + lazy val validators = new MockValidatorsAlwaysSucceed + lazy val regularSync = system.actorOf( RegularSync.props( peersClient, @@ -71,6 +73,7 @@ object RegularSyncItSpecUtils { ledger, bl, blockchainConfig, // FIXME: remove in ETCM-280 + validators.blockValidator, testSyncConfig, ommersPool, pendingTransactionsManager, diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala index e197ddfc96..9225dcbdf3 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala @@ -102,6 +102,7 @@ class SyncController( ledger, blockchain, blockchainConfig, + validators.blockValidator, syncConfig, ommersPool, pendingTransactionsManager, diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 4186e03866..fcf994f3fd 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -8,6 +8,7 @@ import cats.data.NonEmptyList import cats.instances.future._ import cats.instances.option._ import cats.syntax.either._ +import io.iohk.ethereum.consensus.validators.BlockValidator import io.iohk.ethereum.blockchain.sync.PeersClient._ import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{ AwaitingBodiesToBeIgnored, @@ -37,6 +38,7 @@ class BlockFetcher( val peerEventBus: ActorRef, val supervisor: ActorRef, val syncConfig: SyncConfig, + val blockValidator: BlockValidator, implicit val scheduler: Scheduler ) extends Actor with ActorLogging { @@ -54,7 +56,7 @@ class BlockFetcher( } private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) => - BlockFetcherState.initial(importer, blockNr) |> fetchBlocks + BlockFetcherState.initial(importer, blockValidator, blockNr) |> fetchBlocks peerEventBus ! Subscribe( MessageClassifier( Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code), @@ -363,9 +365,10 @@ object BlockFetcher { peerEventBus: ActorRef, supervisor: ActorRef, syncConfig: SyncConfig, + blockValidator: BlockValidator, scheduler: Scheduler ): Props = - Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, scheduler)) + Props(new BlockFetcher(peersClient, peerEventBus, supervisor, syncConfig, blockValidator, scheduler)) sealed trait FetchMsg case class Start(importer: ActorRef, fromBlock: BigInt) extends FetchMsg diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index e6e46c89c2..55a5898736 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -3,8 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular import akka.actor.ActorRef import akka.util.ByteString import cats.data.NonEmptyList -//FIXME: By using this class, we are coupling sync process with a specific consensus (the standard one). -import io.iohk.ethereum.consensus.validators.std.StdBlockValidator +import io.iohk.ethereum.consensus.validators.BlockValidator import io.iohk.ethereum.domain.{Block, BlockHeader, BlockBody, HeadersSeq} import io.iohk.ethereum.network.PeerId import io.iohk.ethereum.network.p2p.messages.PV62.BlockHash @@ -14,6 +13,7 @@ import cats.syntax.option._ import scala.collection.immutable.Queue import scala.annotation.tailrec +import io.iohk.ethereum.consensus.validators.BlockValidator // scalastyle:off number.of.methods /** @@ -37,6 +37,7 @@ import scala.annotation.tailrec */ case class BlockFetcherState( importer: ActorRef, + blockValidator: BlockValidator, readyBlocks: Queue[Block], waitingHeaders: Queue[BlockHeader], fetchingHeadersState: FetchingHeadersState, @@ -121,7 +122,7 @@ case class BlockFetcherState( case (Seq(), _ +: _) => None case (_, Seq()) => Some(matchedBlocks) case (header +: remainingHeaders, body +: remainingBodies) => - val doMatch = StdBlockValidator.validateHeaderAndBody(header, body).isRight + val doMatch = blockValidator.validateHeaderAndBody(header, body).isRight if (doMatch) bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body)) else @@ -241,17 +242,19 @@ case class BlockFetcherState( object BlockFetcherState { case class StateNodeFetcher(hash: ByteString, replyTo: ActorRef) - def initial(importer: ActorRef, lastBlock: BigInt): BlockFetcherState = BlockFetcherState( - importer = importer, - readyBlocks = Queue(), - waitingHeaders = Queue(), - fetchingHeadersState = NotFetchingHeaders, - fetchingBodiesState = NotFetchingBodies, - stateNodeFetcher = None, - lastBlock = lastBlock, - knownTop = lastBlock + 1, - blockProviders = Map() - ) + def initial(importer: ActorRef, blockValidator: BlockValidator, lastBlock: BigInt): BlockFetcherState = + BlockFetcherState( + importer = importer, + blockValidator = blockValidator, + readyBlocks = Queue(), + waitingHeaders = Queue(), + fetchingHeadersState = NotFetchingHeaders, + fetchingBodiesState = NotFetchingBodies, + stateNodeFetcher = None, + lastBlock = lastBlock, + knownTop = lastBlock + 1, + blockProviders = Map() + ) trait FetchingHeadersState case object NotFetchingHeaders extends FetchingHeadersState diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala index 7109e26917..c7bc040da8 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala @@ -9,6 +9,7 @@ import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, Prog import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport import io.iohk.ethereum.blockchain.sync.BlockBroadcast import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator +import io.iohk.ethereum.consensus.validators.BlockValidator import io.iohk.ethereum.crypto.ECDSASignature import io.iohk.ethereum.domain.Blockchain import io.iohk.ethereum.ledger.Ledger @@ -22,6 +23,7 @@ class RegularSync( ledger: Ledger, blockchain: Blockchain, blockchainConfig: BlockchainConfig, + blockValidator: BlockValidator, syncConfig: SyncConfig, ommersPool: ActorRef, pendingTransactionsManager: ActorRef, @@ -31,7 +33,10 @@ class RegularSync( with ActorLogging { val fetcher: ActorRef = - context.actorOf(BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, scheduler), "block-fetcher") + context.actorOf( + BlockFetcher.props(peersClient, peerEventBus, self, syncConfig, blockValidator, scheduler), + "block-fetcher" + ) val broadcaster: ActorRef = context.actorOf( BlockBroadcasterActor .props(new BlockBroadcast(etcPeerManager, syncConfig), peerEventBus, etcPeerManager, syncConfig, scheduler), @@ -122,6 +127,7 @@ object RegularSync { ledger: Ledger, blockchain: Blockchain, blockchainConfig: BlockchainConfig, + blockValidator: BlockValidator, syncConfig: SyncConfig, ommersPool: ActorRef, pendingTransactionsManager: ActorRef, @@ -136,6 +142,7 @@ object RegularSync { ledger, blockchain, blockchainConfig, + blockValidator, syncConfig, ommersPool, pendingTransactionsManager, diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index 5f9b1abac8..124f3df795 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -5,6 +5,7 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import com.miguno.akka.testing.VirtualTime +import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer @@ -134,6 +135,8 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val importer: TestProbe = TestProbe() val regularSync: TestProbe = TestProbe() + val validators = new MockValidatorsAlwaysSucceed + override lazy val syncConfig = defaultSyncConfig.copy( // Same request size was selected for simplification purposes of the flow blockHeadersPerRequest = 10, @@ -152,6 +155,7 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peerEventBus.ref, regularSync.ref, syncConfig, + validators.blockValidator, time.scheduler ) ) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala index 3794d86a1d..754d658100 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala @@ -2,6 +2,7 @@ package io.iohk.ethereum.blockchain.sync.regular import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} +import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.domain.Block import io.iohk.ethereum.Fixtures.Blocks.ValidBlock import io.iohk.ethereum.network.PeerId @@ -9,11 +10,15 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike with Matchers { + + lazy val validators = new MockValidatorsAlwaysSucceed + "BlockFetcherState" when { "invalidating blocks" should { "not allow to go to negative block number" in { val importer = TestProbe().ref - val (_, actual) = BlockFetcherState.initial(importer, 10).invalidateBlocksFrom(-5, None) + val (_, actual) = + BlockFetcherState.initial(importer, validators.blockValidator, 10).invalidateBlocksFrom(-5, None) actual.lastBlock shouldBe 0 } @@ -26,7 +31,7 @@ class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike val newBestBlock = Block(ValidBlock.header.copy(number = ValidBlock.header.number + 1), ValidBlock.body) val fakePeerId = PeerId("fake") - val currentState = BlockFetcherState.initial(importer, currentBestBlock.number) + val currentState = BlockFetcherState.initial(importer, validators.blockValidator, currentBestBlock.number) val newState = currentState.appendNewBlock(newBestBlock, fakePeerId) newState.lastBlock shouldEqual newBestBlock.number newState.blockProviders(newBestBlock.number) shouldEqual fakePeerId diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala index 61400c18aa..5ef7c02510 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala @@ -68,6 +68,7 @@ trait RegularSyncFixtures { self: Matchers with AsyncMockFactory => ledger, blockchain, blockchainConfig, + validators.blockValidator, syncConfig, ommersPool.ref, pendingTransactionsManager.ref, From 85cbcf5703f54287b726144f20d977a5a83d193e Mon Sep 17 00:00:00 2001 From: mk Date: Wed, 4 Nov 2020 14:44:07 -0300 Subject: [PATCH 03/11] [ETCM-283] Add unit test for the failure scenario --- .../sync/regular/BlockFetcherSpec.scala | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index 124f3df795..59c25bb9a1 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -5,7 +5,7 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import com.miguno.akka.testing.VirtualTime -import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed +import io.iohk.ethereum.Mocks.{MockValidatorsAlwaysSucceed, MockValidatorsFailingOnBlockBodies} import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer @@ -125,6 +125,32 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peersClient.expectMsgClass(classOf[BlacklistPeer]) peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } } + + "should not append new blocks if the received data does not match" in new TestSetup { + + // Important: Here we are forcing the mismatch between request headers and received bodies + override lazy val validators = new MockValidatorsFailingOnBlockBodies + + startFetcher() + + val getBlockHeadersRequest = + GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest => () } + + val chain = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block) + val getBlockHeadersResponse = BlockHeaders(chain.map(_.header)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockHeadersResponse)) + + val getBlockBodiesRequest = GetBlockBodies(chain.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest => () } + + // This response will be invalid given we are using a special validator! + val getBlockBodiesResponse = BlockBodies(chain.map(_.body)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse)) + + peersClient.expectMsgClass(classOf[BlacklistPeer]) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest => () } + } } trait TestSetup extends TestSyncConfig { @@ -135,7 +161,7 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val importer: TestProbe = TestProbe() val regularSync: TestProbe = TestProbe() - val validators = new MockValidatorsAlwaysSucceed + lazy val validators = new MockValidatorsAlwaysSucceed override lazy val syncConfig = defaultSyncConfig.copy( // Same request size was selected for simplification purposes of the flow @@ -148,7 +174,7 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val fakePeerActor: TestProbe = TestProbe() val fakePeer = Peer(new InetSocketAddress("127.0.0.1", 9000), fakePeerActor.ref, false) - val blockFetcher = system.actorOf( + lazy val blockFetcher = system.actorOf( BlockFetcher .props( peersClient.ref, From 79be12a72aa6c4b53c21e8eed44174159ee65a52 Mon Sep 17 00:00:00 2001 From: mk Date: Fri, 13 Nov 2020 16:45:12 -0300 Subject: [PATCH 04/11] [ETCM-283] Check that blocks correspond to the requested headers that we want to process --- .../sync/regular/BlockFetcher.scala | 23 +++++----- .../sync/regular/BlockFetcherState.scala | 45 +++++++++++++++---- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index fcf994f3fd..6f0924ba71 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -140,24 +140,25 @@ class BlockFetcher( private def handleBodiesMessages(state: BlockFetcherState): Receive = { case Response(peer, BlockBodies(bodies)) if state.isFetchingBodies => - val newState = - if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) { - log.debug("Received {} block bodies that will be ignored", bodies.size) - state.withBodiesFetchReceived - } else { + log.debug(s"Received ${bodies.size} block bodies") + if (state.fetchingBodiesState == AwaitingBodiesToBeIgnored) { + log.debug("Block bodies will be ignored due to an invalidation was requested for them") + fetchBlocks(state.withBodiesFetchReceived) + } else { + val newState = state.validateBodies(bodies) match { case Left(err) => peersClient ! BlacklistPeer(peer.id, err) state.withBodiesFetchReceived case Right(newBlocks) => - log.debug("Fetched {} block bodies", newBlocks.size) - state.withBodiesFetchReceived.appendNewBlocks(newBlocks, peer.id) + state.withBodiesFetchReceived.receiveBlocks(newBlocks, peer.id) } - } - fetchBlocks(newState) + val waitingHeadersDequeued = newState.waitingHeaders.size - state.waitingHeaders.size + log.debug(s"Processed ${waitingHeadersDequeued} new blocks from received block bodies") + fetchBlocks(newState) + } case RetryBodiesRequest if state.isFetchingBodies => log.debug("Time-out occurred while waiting for bodies") - val newState = state.withBodiesFetchReceived fetchBlocks(newState) } @@ -214,7 +215,7 @@ class BlockFetcher( supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) context become started(newState) // there are some blocks waiting for import but it seems that we reached top on fetch side so we can enqueue new block for import - } else if (newBlockNr == nextExpectedBlock && !state.isFetching && state.waitingHeaders.isEmpty) { + } else if (newBlockNr == nextExpectedBlock && !state.isFetching) { log.debug("Enqueue new block for import") val newState = state.appendNewBlock(block, peerId) supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 55a5898736..225ebd864d 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -105,6 +105,11 @@ case class BlockFetcherState( } ) + /** + * When bodies are requested, the response don't need to be a complete sub chain, + * even more, we could receive an empty chain and that will be considered valid. Here we just + * validate that the received bodies corresponds to an ordered subset of the requested headers. + */ def validateBodies(receivedBodies: Seq[BlockBody]): Either[String, Seq[Block]] = bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies) .toRight( @@ -129,22 +134,44 @@ case class BlockFetcherState( bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks) } - def appendNewBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState = { - val receivedHeaders = blocks.map(_.header) - withPeerForBlocks(fromPeer, blocks.map(_.header.number)) - .copy( - readyBlocks = readyBlocks.enqueue(blocks.toList), - waitingHeaders = waitingHeaders.diff(receivedHeaders) - ) + // We could optimize this method by stopping as soon as a block is not appended. + def receiveBlocks(blocks: Seq[Block], fromPeer: PeerId): BlockFetcherState = { + blocks.foldLeft(this) { case (state, block) => + state.receiveBlock(block, fromPeer) + } } + /** + * Currently to fill in headers we use a queue, so we if we try to process + * a block that has its header in the queue but is not the next in the line, + * we opt for not appending it. + */ + def receiveBlock(block: Block, fromPeer: PeerId): BlockFetcherState = + waitingHeaders.dequeueOption + .map { case (waitingHeader, waitingHeadersTail) => + if (waitingHeader.hash == block.hash) + unsafeAppendNewBlock(block, fromPeer).copy( + waitingHeaders = waitingHeadersTail + ) + else + this + } + .getOrElse(this) + + // only succeed if there is no waiting headers. def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState = + if (waitingHeaders.isEmpty) + unsafeAppendNewBlock(block, fromPeer) + else + this + + // unsafe in terms of not checking waiting headers queue + private def unsafeAppendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState = withPeerForBlocks(fromPeer, Seq(block.header.number)) .withPossibleNewTopAt(block.number) .withLastBlock(block.number) .copy( - readyBlocks = readyBlocks.enqueue(block), - waitingHeaders = waitingHeaders.filter(block.number != _.number) + readyBlocks = readyBlocks.enqueue(block) ) def pickBlocks(amount: Int): Option[(NonEmptyList[Block], BlockFetcherState)] = From 21d65f8d4aba1bb05b329826d425f6659ab3529a Mon Sep 17 00:00:00 2001 From: mk Date: Tue, 17 Nov 2020 12:22:57 -0300 Subject: [PATCH 05/11] [ETCM-283] Add unit tests --- .../sync/regular/BlockFetcherState.scala | 2 +- .../sync/regular/BlockFetcherSpec.scala | 69 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 225ebd864d..4d8258d04f 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -158,7 +158,7 @@ case class BlockFetcherState( } .getOrElse(this) - // only succeed if there is no waiting headers. + // only append if there is no waiting headers. def appendNewBlock(block: Block, fromPeer: PeerId): BlockFetcherState = if (waitingHeaders.isEmpty) unsafeAppendNewBlock(block, fromPeer) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index 59c25bb9a1..c6467f2589 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -5,7 +5,11 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import com.miguno.akka.testing.VirtualTime -import io.iohk.ethereum.Mocks.{MockValidatorsAlwaysSucceed, MockValidatorsFailingOnBlockBodies} +import io.iohk.ethereum.Mocks.{ + MockValidatorsAlwaysSucceed, + MockValidatorsFailingOnBlockBodies, + MockValidatorsFailOnSpecificBlockNumber +} import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer @@ -151,6 +155,69 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peersClient.expectMsgClass(classOf[BlacklistPeer]) peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest => () } } + + "should be able to handle block bodies received in several parts" in new TestSetup { + + startFetcher() + + val getBlockHeadersRequest = + GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest => () } + + val chain = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block) + + val getBlockHeadersResponse = BlockHeaders(chain.map(_.header)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockHeadersResponse)) + + val getBlockBodiesRequest1 = GetBlockBodies(chain.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + + // It will receive all the requested bodies, but splitted in 2 parts. + val (subChain1, subChain2) = chain.splitAt(syncConfig.blockHeadersPerRequest / 2) + + val getBlockBodiesResponse1 = BlockBodies(subChain1.map(_.body)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) + + val getBlockHeadersRequest2 = + GetBlockHeaders(Left(subChain1.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest2 => () } + + val getBlockBodiesRequest2 = GetBlockBodies(subChain2.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest2 => () } + + val getBlockBodiesResponse2 = BlockBodies(subChain2.map(_.body)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) + + peersClient.expectNoMessage() + } + + "should ignore response, without blacklist the peer, in case a sub ordered block bodies chain is received" in new TestSetup { + + // Important: Here (in a hacky way) we are enforcing received bodies + // to be a sub ordered chain that fetcher can't append given their current state + override lazy val validators = new MockValidatorsFailOnSpecificBlockNumber(1) + + startFetcher() + + val getBlockHeadersRequest = + GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest => () } + + val chain = BlockHelpers.generateChain(syncConfig.blockHeadersPerRequest, FixtureBlocks.Genesis.block) + + val getBlockHeadersResponse = BlockHeaders(chain.map(_.header)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockHeadersResponse)) + + val getBlockBodiesRequest1 = GetBlockBodies(chain.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + + val (subChain1, _) = chain.splitAt(syncConfig.blockHeadersPerRequest / 2) + + val getBlockBodiesResponse1 = BlockBodies(subChain1.map(_.body)) + peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) + + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + } } trait TestSetup extends TestSyncConfig { From 149a549c57f9a9473b3b5219211d92b3300d33de Mon Sep 17 00:00:00 2001 From: mk Date: Tue, 17 Nov 2020 12:49:32 -0300 Subject: [PATCH 06/11] [ETCM-283] Fix log message --- .../io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 6f0924ba71..852d21aa89 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -153,7 +153,7 @@ class BlockFetcher( case Right(newBlocks) => state.withBodiesFetchReceived.receiveBlocks(newBlocks, peer.id) } - val waitingHeadersDequeued = newState.waitingHeaders.size - state.waitingHeaders.size + val waitingHeadersDequeued = state.waitingHeaders.size - newState.waitingHeaders.size log.debug(s"Processed ${waitingHeadersDequeued} new blocks from received block bodies") fetchBlocks(newState) } From 0a4b94a5e00d9285877a1d4927f1d1e0e8e44106 Mon Sep 17 00:00:00 2001 From: mk Date: Thu, 19 Nov 2020 13:07:27 -0300 Subject: [PATCH 07/11] [ETCM-283] Fix and Refactor tests --- .../sync/util/RegularSyncItSpecUtils.scala | 3 +- .../sync/regular/BlockFetcherState.scala | 5 +- .../sync/regular/BlockFetcherSpec.scala | 54 +++++++++++++------ .../sync/regular/BlockFetcherStateSpec.scala | 29 +++++----- 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala index 2105395b26..25f376c624 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala @@ -3,7 +3,6 @@ package io.iohk.ethereum.sync.util import akka.actor.ActorRef import akka.util.ByteString import cats.effect.Resource -import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol} import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock import io.iohk.ethereum.blockchain.sync.regular.RegularSync @@ -63,7 +62,7 @@ object RegularSyncItSpecUtils { "pending-transactions-manager" ) - lazy val validators = new MockValidatorsAlwaysSucceed + lazy val validators = buildEthashConsensus.validators lazy val regularSync = system.actorOf( RegularSync.props( diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 305ab89b7b..6ce038fbf8 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -148,7 +148,7 @@ case class BlockFetcherState( else bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks) } - + /** * If blocks is empty collection - headers in queue are removed as the cause is: * - the headers are from rejected fork and therefore it won't be possible to resolve blocks for them @@ -161,7 +161,6 @@ case class BlockFetcherState( waitingHeaders = Queue.empty ) else - // We could optimize this by stopping as soon as a block is not enqueued. blocks.foldLeft(this) { case (state, block) => state.enqueueRequestedBlock(block, fromPeer) } @@ -174,7 +173,7 @@ case class BlockFetcherState( waitingHeaders.dequeueOption .map { case (waitingHeader, waitingHeadersTail) => if (waitingHeader.hash == block.hash) - withPeerForBlocks(fromPeer, Seq(block.header.number)) + withPeerForBlocks(fromPeer, Seq(block.number)) .withPossibleNewTopAt(block.number) .withLastBlock(block.number) .copy( diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index ecae7a11ea..5edc0e2921 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -4,11 +4,7 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import com.miguno.akka.testing.VirtualTime -import io.iohk.ethereum.Mocks.{ - MockValidatorsAlwaysSucceed, - MockValidatorsFailingOnBlockBodies, - MockValidatorsFailOnSpecificBlockNumber -} +import io.iohk.ethereum.Mocks.{MockValidatorsAlwaysSucceed, MockValidatorsFailingOnBlockBodies} import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer @@ -129,7 +125,7 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () } } - "should not append new blocks if the received data does not match" in new TestSetup { + "should not enqueue requested blocks if the received bodies does not match" in new TestSetup { // Important: Here we are forcing the mismatch between request headers and received bodies override lazy val validators = new MockValidatorsFailingOnBlockBodies @@ -151,8 +147,14 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val getBlockBodiesResponse = BlockBodies(chain.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse)) + // Fetcher should blacklist the peer and retry asking for the same bodies peersClient.expectMsgClass(classOf[BlacklistPeer]) peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest => () } + + // Fetcher should not enqueue any new block + importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) + importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) + importer.expectNoMessage() } "should be able to handle block bodies received in several parts" in new TestSetup { @@ -172,13 +174,13 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } // It will receive all the requested bodies, but splitted in 2 parts. - val (subChain1, subChain2) = chain.splitAt(syncConfig.blockHeadersPerRequest / 2) + val (subChain1, subChain2) = chain.splitAt(syncConfig.blockBodiesPerRequest / 2) val getBlockBodiesResponse1 = BlockBodies(subChain1.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) val getBlockHeadersRequest2 = - GetBlockHeaders(Left(subChain1.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + GetBlockHeaders(Left(chain.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest2 => () } val getBlockBodiesRequest2 = GetBlockBodies(subChain2.map(_.hash)) @@ -188,13 +190,16 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) peersClient.expectNoMessage() - } - "should ignore response, without blacklist the peer, in case a sub ordered block bodies chain is received" in new TestSetup { + // Fetcher should enqueue all the received blocks + importer.send(blockFetcher, PickBlocks(chain.size)) + importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) + importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => + blocks.map(_.hash).toList shouldEqual chain.map(_.hash) + } + } - // Important: Here (in a hacky way) we are enforcing received bodies - // to be a sub ordered chain that fetcher can't append given their current state - override lazy val validators = new MockValidatorsFailOnSpecificBlockNumber(1) + "should stop requesting, without blacklist the peer, in case empty bodies are received" in new TestSetup { startFetcher() @@ -210,12 +215,31 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val getBlockBodiesRequest1 = GetBlockBodies(chain.map(_.hash)) peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } - val (subChain1, _) = chain.splitAt(syncConfig.blockHeadersPerRequest / 2) + // It will receive part of the requested bodies. + val (subChain1, subChain2) = chain.splitAt(syncConfig.blockBodiesPerRequest / 2) val getBlockBodiesResponse1 = BlockBodies(subChain1.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse1)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest1 => () } + val getBlockHeadersRequest2 = + GetBlockHeaders(Left(chain.last.number + 1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockHeadersRequest2 => () } + + val getBlockBodiesRequest2 = GetBlockBodies(subChain2.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == getBlockBodiesRequest2 => () } + + // We receive empty bodies instead of the second part + val getBlockBodiesResponse2 = BlockBodies(List()) + peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) + + peersClient.expectNoMessage() + + // If we try to pick the whole chain we should only receive the first part + importer.send(blockFetcher, PickBlocks(chain.size)) + importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) + importer.expectMsgPF() { case BlockFetcher.PickedBlocks(blocks) => + blocks.map(_.hash).toList shouldEqual subChain1.map(_.hash) + } } "should ensure blocks passed to importer are always forming chain" in new TestSetup { diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala index 58bc0c769f..5480ab4d3b 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherStateSpec.scala @@ -3,10 +3,7 @@ package io.iohk.ethereum.blockchain.sync.regular import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed -import io.iohk.ethereum.domain.Block import io.iohk.ethereum.BlockHelpers -import io.iohk.ethereum.Fixtures.Blocks.ValidBlock -import io.iohk.ethereum.domain.Block import io.iohk.ethereum.network.PeerId import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -30,35 +27,35 @@ class BlockFetcherStateSpec extends TestKit(ActorSystem()) with AnyWordSpecLike "handling requested blocks" should { "clear headers queue if got empty list of blocks" in { + val importer = TestProbe().ref val headers = BlockHelpers.generateChain(5, BlockHelpers.genesis).map(_.header) val peer = PeerId("foo") val result = BlockFetcherState - .initial(TestProbe().ref, 0) + .initial(importer, validators.blockValidator, 0) .appendHeaders(headers) - .map(_.handleRequestedBlocks(peer, List())) - .map(_.waitingHeaders) + .map(_.handleRequestedBlocks(List(), peer)) - assert(result === Right(Queue.empty)) - result.lastBlock shouldEqual 0 + assert(result.map(_.waitingHeaders) === Right(Queue.empty)) + assert(result.map(_.lastBlock) === Right(headers.last.number)) } "enqueue requested blocks" in { - + val importer = TestProbe().ref val blocks = BlockHelpers.generateChain(5, BlockHelpers.genesis) val peer = PeerId("foo") val result = BlockFetcherState - .initial(TestProbe().ref, 0) + .initial(importer, validators.blockValidator, 0) .appendHeaders(blocks.map(_.header)) - .map(_.handleRequestedBlocks(peer, blocks.map(_.body))) + .map(_.handleRequestedBlocks(blocks, peer)) - assert(result.waitingHeaders === Right(Queue.empty)) - result.lastBlock shouldEqual blocks.lastBlock.number - blocks.forEach { block => - result.blockProviders(block.number) shouldEqual fakePeerId + assert(result.map(_.waitingHeaders) === Right(Queue.empty)) + assert(result.map(_.lastBlock) === Right(blocks.last.number)) + blocks.foreach { block => + assert(result.map(_.blockProviders(block.number)) === Right(peer)) } - result.knownTop shouldEqual blocks.lastBlock.number + assert(result.map(_.knownTop) === Right(blocks.last.number)) } } } From ef937c4da0441adc711255eafc134de6ba1a8ba5 Mon Sep 17 00:00:00 2001 From: mk Date: Thu, 19 Nov 2020 17:32:21 -0300 Subject: [PATCH 08/11] [ETCM-283] Remove unnecesary lastFullBlockNumber --- .../ethereum/blockchain/sync/regular/BlockFetcher.scala | 2 +- .../blockchain/sync/regular/BlockFetcherState.scala | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 02a6709f25..5ba31de7d1 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -202,7 +202,7 @@ class BlockFetcher( //TODO ETCM-389: Handle mined, checkpoint and new blocks uniformly log.debug("Received NewBlock {}", block.idTag) val newBlockNr = block.number - val nextExpectedBlock = state.lastFullBlockNumber + 1 + val nextExpectedBlock = state.lastBlock + 1 if (state.isOnTop && newBlockNr == nextExpectedBlock) { log.debug("Passing block directly to importer") diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index 6ce038fbf8..c006f27148 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -59,12 +59,6 @@ case class BlockFetcherState( def hasReachedSize(size: Int): Boolean = (readyBlocks.size + waitingHeaders.size) >= size - def lastFullBlockNumber: BigInt = - readyBlocks.lastOption - .map(_.number) - .orElse(waitingHeaders.headOption.map(_.number - 1)) - .getOrElse(lastBlock) - def lowestBlock: BigInt = readyBlocks.headOption .map(_.number) From 071356b69ffdb0d27a0de73485d8d13611d0cf0c Mon Sep 17 00:00:00 2001 From: mk Date: Thu, 19 Nov 2020 17:38:08 -0300 Subject: [PATCH 09/11] [ETCM-283] Fix it --- .../io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala index 25f376c624..4f6ba21635 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/RegularSyncItSpecUtils.scala @@ -3,6 +3,7 @@ package io.iohk.ethereum.sync.util import akka.actor.ActorRef import akka.util.ByteString import cats.effect.Resource +import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol} import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock import io.iohk.ethereum.blockchain.sync.regular.RegularSync From 2b0edd7c49688e08342a83df504b3bebc6ddebf9 Mon Sep 17 00:00:00 2001 From: mk Date: Tue, 24 Nov 2020 08:49:06 -0300 Subject: [PATCH 10/11] [ETCM-283] Tests clean up --- .../ethereum/blockchain/sync/regular/BlockFetcherSpec.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index c6c0ff669f..31543d7262 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -154,7 +154,7 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w // Fetcher should not enqueue any new block importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) - importer.expectNoMessage() + importer.expectNoMessage(100.millis) } "should be able to handle block bodies received in several parts" in new TestSetup { @@ -189,8 +189,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val getBlockBodiesResponse2 = BlockBodies(subChain2.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) - peersClient.expectNoMessage() - // Fetcher should enqueue all the received blocks importer.send(blockFetcher, PickBlocks(chain.size)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) @@ -232,8 +230,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val getBlockBodiesResponse2 = BlockBodies(List()) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) - peersClient.expectNoMessage() - // If we try to pick the whole chain we should only receive the first part importer.send(blockFetcher, PickBlocks(chain.size)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true }) From 5f0e5c29790816fdaed4520bb935d3f8f0fc5bff Mon Sep 17 00:00:00 2001 From: mk Date: Tue, 24 Nov 2020 09:45:35 -0300 Subject: [PATCH 11/11] [ETCM-283] Add Thread.sleep in order to fix test --- .../ethereum/blockchain/sync/regular/BlockFetcherSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index 31543d7262..a080e2434f 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -5,7 +5,7 @@ import akka.actor.ActorSystem import akka.testkit.{TestKit, TestProbe} import com.miguno.akka.testing.VirtualTime import io.iohk.ethereum.Mocks.{MockValidatorsAlwaysSucceed, MockValidatorsFailingOnBlockBodies} -import io.iohk.ethereum.BlockHelpers +import io.iohk.ethereum.{BlockHelpers, Timeouts} import io.iohk.ethereum.Fixtures.{Blocks => FixtureBlocks} import io.iohk.ethereum.blockchain.sync.PeersClient.BlacklistPeer import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.{InternalLastBlockImport, InvalidateBlocksFrom, PickBlocks} @@ -189,6 +189,9 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val getBlockBodiesResponse2 = BlockBodies(subChain2.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, getBlockBodiesResponse2)) + // We need to wait a while in order to allow fetcher to process all the blocks + Thread.sleep(Timeouts.shortTimeout.toMillis) + // Fetcher should enqueue all the received blocks importer.send(blockFetcher, PickBlocks(chain.size)) importer.ignoreMsg({ case BlockImporter.NotOnTop => true })