diff --git a/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala index ad1986504f..a152646a96 100644 --- a/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala @@ -1,6 +1,6 @@ package io.iohk.ethereum.sync -import io.iohk.ethereum.FlatSpecBase +import io.iohk.ethereum.FreeSpecBase import io.iohk.ethereum.sync.util.RegularSyncItSpecUtils.FakePeer import io.iohk.ethereum.sync.util.SyncCommonItSpec._ import monix.execution.Scheduler @@ -9,7 +9,7 @@ import org.scalatest.matchers.should.Matchers import scala.concurrent.duration._ -class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll { +class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAll { implicit val testScheduler = Scheduler.fixedPool("test", 16) override def afterAll(): Unit = { @@ -17,52 +17,69 @@ class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAl testScheduler.awaitTermination(120.second) } - it should "sync blockchain with same best block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { - case (peer1, peer2) => - val blockNumer: Int = 2000 - for { - _ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate) - _ <- peer1.connectToPeers(Set(peer2.node)) - _ <- peer1.startRegularSync().delayExecution(50.milliseconds) - _ <- peer2.broadcastBlock()(IdentityUpdate).delayExecution(500.milliseconds) - _ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer) - } yield { - assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash) - } + "peer 2 should sync to the top of peer1 blockchain" - { + "given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { + case (peer1, peer2) => + val blockNumer: Int = 2000 + for { + _ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate) + _ <- peer2.startRegularSync() + _ <- peer2.connectToPeers(Set(peer1.node)) + _ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer) + } yield { + assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash) + } + } + + "given a previously mined blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { + case (peer1, peer2) => + val blockHeadersPerRequest = peer2.syncConfig.blockHeadersPerRequest + for { + _ <- peer1.startRegularSync() + _ <- peer1.mineNewBlocks(500.milliseconds, blockHeadersPerRequest + 1)(IdentityUpdate) + _ <- peer1.waitForRegularSyncLoadLastBlock(blockHeadersPerRequest + 1) + _ <- peer2.startRegularSync() + _ <- peer2.connectToPeers(Set(peer1.node)) + _ <- peer2.waitForRegularSyncLoadLastBlock(blockHeadersPerRequest + 1) + } yield { + assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash) + } + } } - it should "sync blockchain progressing forward in the same time" in customTestCaseResourceM( + "peers should keep synced the same blockchain while their progressing forward" in customTestCaseResourceM( FakePeer.start2FakePeersRes() ) { case (peer1, peer2) => val blockNumer: Int = 2000 for { - _ <- peer2.startRegularSync().delayExecution(50.milliseconds) - _ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate) - _ <- peer1.connectToPeers(Set(peer2.node)) - _ <- peer1.startRegularSync().delayExecution(500.milliseconds) - _ <- peer2.mineNewBlocks(2000.milliseconds, 2)(IdentityUpdate) + _ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate) + _ <- peer1.startRegularSync() + _ <- peer2.startRegularSync() + _ <- peer2.connectToPeers(Set(peer1.node)) + _ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer) + _ <- peer2.mineNewBlocks(100.milliseconds, 2)(IdentityUpdate) _ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 2) + _ <- peer1.mineNewBlocks(100.milliseconds, 2)(IdentityUpdate) + _ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 4) } yield { assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash) } } - it should "sync peers with divergent chains will be forced to resolve branches" in customTestCaseResourceM( + "peers with divergent chains will be forced to resolve branches" in customTestCaseResourceM( FakePeer.start2FakePeersRes() ) { case (peer1, peer2) => val blockNumer: Int = 2000 for { - _ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate) - _ <- peer2.startRegularSync().delayExecution(50.milliseconds) _ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate) - _ <- peer1.startRegularSync().delayExecution(50.milliseconds) - _ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds) - _ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds) - _ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds) + _ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate) + _ <- peer1.startRegularSync() + _ <- peer2.startRegularSync() + _ <- peer1.mineNewBlock()(IdentityUpdate) + _ <- peer2.mineNewBlocks(100.milliseconds, 3)(IdentityUpdate) _ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3) - _ <- peer1.mineNewBlock()(IdentityUpdate).delayExecution(500.milliseconds) _ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 1) - _ <- peer1.connectToPeers(Set(peer2.node)).delayExecution(500.milliseconds) + _ <- peer2.connectToPeers(Set(peer1.node)) _ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 3) _ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3) } yield { diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala index 6dbe80165d..5be786a039 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala @@ -56,7 +56,10 @@ class BlockFetcher( private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) => BlockFetcherState.initial(importer, blockNr) |> fetchBlocks peerEventBus ! Subscribe( - MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers) + MessageClassifier( + Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code), + PeerSelector.AllPeers + ) ) } @@ -71,7 +74,8 @@ class BlockFetcher( handleNewBlockMessages(state) orElse handleHeadersMessages(state) orElse handleBodiesMessages(state) orElse - handleStateNodeMessages(state) + handleStateNodeMessages(state) orElse + handlePossibleTopUpdate(state) private def handleCommands(state: BlockFetcherState): Receive = { case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks @@ -227,6 +231,24 @@ class BlockFetcher( fetchBlocks(newState) } + private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = { + //by handling these type of messages, fetcher can received from network, fresh info about blocks on top + //ex. After a successful handshake, fetcher will receive the info about the header of the peer best block + case MessageFromPeer(BlockHeaders(headers), _) => + headers.lastOption.map { bh => + log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}") + val newState = state.withPossibleNewTopAt(bh.number) + fetchBlocks(newState) + } + //keep fetcher state updated in case new checkpoint block or mined block was imported + case InternalLastBlockImport(blockNr) => { + log.debug(s"New last block $blockNr imported from the inside") + val newLastBlock = blockNr.max(state.lastBlock) + val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr) + fetchBlocks(newState) + } + } + private def handlePickedBlocks( state: BlockFetcherState )(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState = @@ -358,6 +380,7 @@ object BlockFetcher { new InvalidateBlocksFrom(from, reason, toBlacklist) } case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchMsg + case class InternalLastBlockImport(blockNr: BigInt) extends FetchMsg case object RetryBodiesRequest extends FetchMsg case object RetryHeadersRequest extends FetchMsg diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala index 7a3a7511c0..bd226d9278 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala @@ -178,7 +178,7 @@ class BlockImporter( ): Future[(List[Block], Option[Any])] = if (blocks.isEmpty) { importedBlocks.headOption match { - case Some(block) => supervisor ! ProgressProtocol.ImportedBlock(block.number) + case Some(block) => supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false) case None => () } @@ -208,15 +208,35 @@ class BlockImporter( } private def importMinedBlock(block: Block, state: ImporterState): Unit = - importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state) + importBlock( + block, + new MinedBlockImportMessages(block), + informFetcherOnFail = false, + internally = true + )(state) private def importCheckpointBlock(block: Block, state: ImporterState): Unit = - importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state) + importBlock( + block, + new CheckpointBlockImportMessages(block), + informFetcherOnFail = false, + internally = true + )(state) private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit = - importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state) - - private def importBlock(block: Block, importMessages: ImportMessages, informFetcherOnFail: Boolean): ImportFn = { + importBlock( + block, + new NewBlockImportMessages(block, peerId), + informFetcherOnFail = true, + internally = false + )(state) + + private def importBlock( + block: Block, + importMessages: ImportMessages, + informFetcherOnFail: Boolean, + internally: Boolean + ): ImportFn = { def doLog(entry: ImportMessages.LogEntry): Unit = log.log(entry._1, entry._2) importWith { @@ -229,7 +249,7 @@ class BlockImporter( val (blocks, tds) = importedBlocksData.map(data => (data.block, data.td)).unzip broadcastBlocks(blocks, tds) updateTxPool(importedBlocksData.map(_.block), Seq.empty) - supervisor ! ProgressProtocol.ImportedBlock(block.number) + supervisor ! ProgressProtocol.ImportedBlock(block.number, internally) case BlockEnqueued => () @@ -241,7 +261,7 @@ class BlockImporter( updateTxPool(newBranch, oldBranch) broadcastBlocks(newBranch, totalDifficulties) newBranch.lastOption match { - case Some(newBlock) => supervisor ! ProgressProtocol.ImportedBlock(newBlock.number) + case Some(newBlock) => supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally) case None => () } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala index 395c94a853..7109e26917 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala @@ -6,6 +6,7 @@ import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, SyncProtocol} import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState} +import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport import io.iohk.ethereum.blockchain.sync.BlockBroadcast import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator import io.iohk.ethereum.crypto.ECDSASignature @@ -86,13 +87,20 @@ class RegularSync( case SyncProtocol.GetStatus => sender() ! progressState.toStatus - case msg: ProgressProtocol => - val newState = msg match { - case ProgressProtocol.StartedFetching => progressState.copy(startedFetching = true) - case ProgressProtocol.StartingFrom(blockNumber) => - progressState.copy(initialBlock = blockNumber, currentBlock = blockNumber) - case ProgressProtocol.GotNewBlock(blockNumber) => progressState.copy(bestKnownNetworkBlock = blockNumber) - case ProgressProtocol.ImportedBlock(blockNumber) => progressState.copy(currentBlock = blockNumber) + + case ProgressProtocol.StartedFetching => + val newState = progressState.copy(startedFetching = true) + context become running(newState) + case ProgressProtocol.StartingFrom(blockNumber) => + val newState = progressState.copy(initialBlock = blockNumber, currentBlock = blockNumber) + context become running(newState) + case ProgressProtocol.GotNewBlock(blockNumber) => + val newState = progressState.copy(bestKnownNetworkBlock = blockNumber) + context become running(newState) + case ProgressProtocol.ImportedBlock(blockNumber, internally) => + val newState = progressState.copy(currentBlock = blockNumber) + if (internally) { + fetcher ! InternalLastBlockImport(blockNumber) } context become running(newState) } @@ -159,6 +167,6 @@ object RegularSync { case object StartedFetching extends ProgressProtocol case class StartingFrom(blockNumber: BigInt) extends ProgressProtocol case class GotNewBlock(blockNumber: BigInt) extends ProgressProtocol - case class ImportedBlock(blockNumber: BigInt) extends ProgressProtocol + case class ImportedBlock(blockNumber: BigInt, internally: Boolean) extends ProgressProtocol } } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala index 60a2403d0a..5f9b1abac8 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala @@ -51,7 +51,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse)) - // Trigger sync (to be improved with ETCM-248) triggerFetching() // Second headers request with response pending @@ -99,7 +98,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body)) peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse)) - // Trigger sync (to be improved with ETCM-248) triggerFetching() // Second headers request with response pending @@ -162,7 +160,12 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w blockFetcher ! BlockFetcher.Start(importer.ref, 0) peerEventBus.expectMsg( - Subscribe(MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)) + Subscribe( + MessageClassifier( + Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code), + PeerSelector.AllPeers + ) + ) ) } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala index 97561d878f..c23efc67d0 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala @@ -68,12 +68,15 @@ class RegularSyncSpec "Regular Sync" when { "initializing" should { - "subscribe for new blocks and new hashes" in sync(new Fixture(testSystem) { + "subscribe for new blocks, new hashes and new block headers" in sync(new Fixture(testSystem) { regularSync ! SyncProtocol.Start peerEventBus.expectMsg( PeerEventBusActor.Subscribe( - MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers) + MessageClassifier( + Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code), + PeerSelector.AllPeers + ) ) ) })