From 4696aa8f6e1159e1116352cd3c7f37495951dd8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=9Al=C4=85ski?= Date: Tue, 12 Jan 2021 17:27:40 +0100 Subject: [PATCH] fixed block fetcher failing test --- .../sync/regular/BlockFetcherState.scala | 6 +++++- .../sync/regular/BlockFetcherSpec.scala | 20 +++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala index dcc05aa872..b4994b6abe 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherState.scala @@ -100,8 +100,12 @@ case class BlockFetcherState( .withKnownTopAt(block.number) Right(newState) } else if (isExistInWaitingHeaders(block.header.parentHash)) { + // ignore already requested bodies + val newFetchingBodiesState = + if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState val newState = copy( - waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header) + waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header), + fetchingBodiesState = newFetchingBodiesState ) .withKnownTopAt(block.number) Right(newState) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index bb40172eb6..15f2b61156 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -320,17 +320,25 @@ class BlockFetcherSpec handleFirstBlockBatchBodies() + // second bodies request + val secondGetBlockBodiesRequest = GetBlockBodies(secondBlocksBatch.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockBodiesRequest => ()} + // send old checkpoint block blockFetcher ! MessageFromPeer(PV64.NewBlock(checkpointBlock, ChainWeight(checkpointBlock.number, checkpointBlock.header.difficulty)), fakePeer.id) - // Second bodies request - val secondGetBlockBodiesRequest = GetBlockBodies(alternativeSecondBlocksBatch.map(_.hash)) - peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == secondGetBlockBodiesRequest => () } - - // Second bodies response - val secondGetBlockBodiesResponse = BlockBodies(alternativeSecondBlocksBatch.map(_.body)) + // second bodies response + val secondGetBlockBodiesResponse = BlockBodies(secondBlocksBatch.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, secondGetBlockBodiesResponse)) + // third bodies request after adding checkpoint into the waiting headers queue + val thirdGetBlockBodiesRequest = GetBlockBodies(alternativeSecondBlocksBatch.map(_.hash)) + peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == thirdGetBlockBodiesRequest => ()} + + // third bodies response + val thirdGetBlockBodiesResponse = BlockBodies(alternativeSecondBlocksBatch.map(_.body)) + peersClient.reply(PeersClient.Response(fakePeer, thirdGetBlockBodiesResponse)) + // We need to wait a while in order to allow fetcher to process all the blocks system.scheduler.scheduleOnce(Timeouts.shortTimeout) { importer.send(blockFetcher, PickBlocks(syncConfig.blocksBatchSize * 2))