Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.ethereum.sync

import io.iohk.ethereum.FlatSpecBase
import io.iohk.ethereum.FreeSpecBase
import io.iohk.ethereum.sync.util.RegularSyncItSpecUtils.FakePeer
import io.iohk.ethereum.sync.util.SyncCommonItSpec._
import monix.execution.Scheduler
Expand All @@ -9,60 +9,77 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

class RegularSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {
class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAll {
implicit val testScheduler = Scheduler.fixedPool("test", 16)

override def afterAll(): Unit = {
testScheduler.shutdown()
testScheduler.awaitTermination(120.second)
}

it should "sync blockchain with same best block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
val blockNumer: Int = 2000
for {
_ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer1.startRegularSync().delayExecution(50.milliseconds)
_ <- peer2.broadcastBlock()(IdentityUpdate).delayExecution(500.milliseconds)
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer)
} yield {
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
}
"peer 2 should sync to the top of peer1 blockchain" - {
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
val blockNumer: Int = 2000
for {
_ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer2.startRegularSync()
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer)
} yield {
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
}
}

"given a previously mined blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
val blockHeadersPerRequest = peer2.syncConfig.blockHeadersPerRequest
for {
_ <- peer1.startRegularSync()
_ <- peer1.mineNewBlocks(500.milliseconds, blockHeadersPerRequest + 1)(IdentityUpdate)
_ <- peer1.waitForRegularSyncLoadLastBlock(blockHeadersPerRequest + 1)
_ <- peer2.startRegularSync()
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer2.waitForRegularSyncLoadLastBlock(blockHeadersPerRequest + 1)
} yield {
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
}
}
}

it should "sync blockchain progressing forward in the same time" in customTestCaseResourceM(
"peers should keep synced the same blockchain while their progressing forward" in customTestCaseResourceM(
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
val blockNumer: Int = 2000
for {
_ <- peer2.startRegularSync().delayExecution(50.milliseconds)
_ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer1.startRegularSync().delayExecution(500.milliseconds)
_ <- peer2.mineNewBlocks(2000.milliseconds, 2)(IdentityUpdate)
_ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.startRegularSync()
_ <- peer2.startRegularSync()
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer)
_ <- peer2.mineNewBlocks(100.milliseconds, 2)(IdentityUpdate)
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 2)
_ <- peer1.mineNewBlocks(100.milliseconds, 2)(IdentityUpdate)
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 4)
} yield {
assert(peer1.bl.getBestBlock().hash == peer2.bl.getBestBlock().hash)
}
}

it should "sync peers with divergent chains will be forced to resolve branches" in customTestCaseResourceM(
"peers with divergent chains will be forced to resolve branches" in customTestCaseResourceM(
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
val blockNumer: Int = 2000
for {
_ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer2.startRegularSync().delayExecution(50.milliseconds)
_ <- peer1.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.startRegularSync().delayExecution(50.milliseconds)
_ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds)
_ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds)
_ <- peer2.mineNewBlock(10)(IdentityUpdate).delayExecution(500.milliseconds)
_ <- peer2.importBlocksUntil(blockNumer)(IdentityUpdate)
_ <- peer1.startRegularSync()
_ <- peer2.startRegularSync()
_ <- peer1.mineNewBlock()(IdentityUpdate)
_ <- peer2.mineNewBlocks(100.milliseconds, 3)(IdentityUpdate)
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3)
_ <- peer1.mineNewBlock()(IdentityUpdate).delayExecution(500.milliseconds)
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 1)
_ <- peer1.connectToPeers(Set(peer2.node)).delayExecution(500.milliseconds)
_ <- peer2.connectToPeers(Set(peer1.node))
_ <- peer1.waitForRegularSyncLoadLastBlock(blockNumer + 3)
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3)
} yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ class BlockFetcher(
private def idle(): Receive = handleCommonMessages(None) orElse { case Start(importer, blockNr) =>
BlockFetcherState.initial(importer, blockNr) |> fetchBlocks
peerEventBus ! Subscribe(
MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)
MessageClassifier(
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
PeerSelector.AllPeers
)
)
}

Expand All @@ -71,7 +74,8 @@ class BlockFetcher(
handleNewBlockMessages(state) orElse
handleHeadersMessages(state) orElse
handleBodiesMessages(state) orElse
handleStateNodeMessages(state)
handleStateNodeMessages(state) orElse
handlePossibleTopUpdate(state)

private def handleCommands(state: BlockFetcherState): Receive = {
case PickBlocks(amount) => state.pickBlocks(amount) |> handlePickedBlocks(state) |> fetchBlocks
Expand Down Expand Up @@ -227,6 +231,24 @@ class BlockFetcher(
fetchBlocks(newState)
}

private def handlePossibleTopUpdate(state: BlockFetcherState): Receive = {
//by handling these type of messages, fetcher can received from network, fresh info about blocks on top
//ex. After a successful handshake, fetcher will receive the info about the header of the peer best block
case MessageFromPeer(BlockHeaders(headers), _) =>
headers.lastOption.map { bh =>
log.debug(s"Candidate for new top at block ${bh.number}, current know top ${state.knownTop}")
val newState = state.withPossibleNewTopAt(bh.number)
fetchBlocks(newState)
}
//keep fetcher state updated in case new checkpoint block or mined block was imported
case InternalLastBlockImport(blockNr) => {
log.debug(s"New last block $blockNr imported from the inside")
val newLastBlock = blockNr.max(state.lastBlock)
val newState = state.withLastBlock(newLastBlock).withPossibleNewTopAt(blockNr)
fetchBlocks(newState)
}
}

private def handlePickedBlocks(
state: BlockFetcherState
)(pickResult: Option[(NonEmptyList[Block], BlockFetcherState)]): BlockFetcherState =
Expand Down Expand Up @@ -358,6 +380,7 @@ object BlockFetcher {
new InvalidateBlocksFrom(from, reason, toBlacklist)
}
case class BlockImportFailed(blockNr: BigInt, reason: String) extends FetchMsg
case class InternalLastBlockImport(blockNr: BigInt) extends FetchMsg
case object RetryBodiesRequest extends FetchMsg
case object RetryHeadersRequest extends FetchMsg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class BlockImporter(
): Future[(List[Block], Option[Any])] =
if (blocks.isEmpty) {
importedBlocks.headOption match {
case Some(block) => supervisor ! ProgressProtocol.ImportedBlock(block.number)
case Some(block) => supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
case None => ()
}

Expand Down Expand Up @@ -208,15 +208,35 @@ class BlockImporter(
}

private def importMinedBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)
importBlock(
block,
new MinedBlockImportMessages(block),
informFetcherOnFail = false,
internally = true
)(state)

private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)
importBlock(
block,
new CheckpointBlockImportMessages(block),
informFetcherOnFail = false,
internally = true
)(state)

private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)

private def importBlock(block: Block, importMessages: ImportMessages, informFetcherOnFail: Boolean): ImportFn = {
importBlock(
block,
new NewBlockImportMessages(block, peerId),
informFetcherOnFail = true,
internally = false
)(state)

private def importBlock(
block: Block,
importMessages: ImportMessages,
informFetcherOnFail: Boolean,
internally: Boolean
): ImportFn = {
def doLog(entry: ImportMessages.LogEntry): Unit = log.log(entry._1, entry._2)

importWith {
Expand All @@ -229,7 +249,7 @@ class BlockImporter(
val (blocks, tds) = importedBlocksData.map(data => (data.block, data.td)).unzip
broadcastBlocks(blocks, tds)
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
supervisor ! ProgressProtocol.ImportedBlock(block.number)
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)

case BlockEnqueued => ()

Expand All @@ -241,7 +261,7 @@ class BlockImporter(
updateTxPool(newBranch, oldBranch)
broadcastBlocks(newBranch, totalDifficulties)
newBranch.lastOption match {
case Some(newBlock) => supervisor ! ProgressProtocol.ImportedBlock(newBlock.number)
case Some(newBlock) => supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally)
case None => ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.{NewCheckpoint, ProgressProtocol, ProgressState}
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.InternalLastBlockImport
import io.iohk.ethereum.blockchain.sync.BlockBroadcast
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.crypto.ECDSASignature
Expand Down Expand Up @@ -86,13 +87,20 @@ class RegularSync(

case SyncProtocol.GetStatus =>
sender() ! progressState.toStatus
case msg: ProgressProtocol =>
val newState = msg match {
case ProgressProtocol.StartedFetching => progressState.copy(startedFetching = true)
case ProgressProtocol.StartingFrom(blockNumber) =>
progressState.copy(initialBlock = blockNumber, currentBlock = blockNumber)
case ProgressProtocol.GotNewBlock(blockNumber) => progressState.copy(bestKnownNetworkBlock = blockNumber)
case ProgressProtocol.ImportedBlock(blockNumber) => progressState.copy(currentBlock = blockNumber)

case ProgressProtocol.StartedFetching =>
val newState = progressState.copy(startedFetching = true)
context become running(newState)
case ProgressProtocol.StartingFrom(blockNumber) =>
val newState = progressState.copy(initialBlock = blockNumber, currentBlock = blockNumber)
context become running(newState)
case ProgressProtocol.GotNewBlock(blockNumber) =>
val newState = progressState.copy(bestKnownNetworkBlock = blockNumber)
context become running(newState)
case ProgressProtocol.ImportedBlock(blockNumber, internally) =>
val newState = progressState.copy(currentBlock = blockNumber)
if (internally) {
fetcher ! InternalLastBlockImport(blockNumber)
}
context become running(newState)
}
Expand Down Expand Up @@ -159,6 +167,6 @@ object RegularSync {
case object StartedFetching extends ProgressProtocol
case class StartingFrom(blockNumber: BigInt) extends ProgressProtocol
case class GotNewBlock(blockNumber: BigInt) extends ProgressProtocol
case class ImportedBlock(blockNumber: BigInt) extends ProgressProtocol
case class ImportedBlock(blockNumber: BigInt, internally: Boolean) extends ProgressProtocol
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse))

// Trigger sync (to be improved with ETCM-248)
triggerFetching()

// Second headers request with response pending
Expand Down Expand Up @@ -99,7 +98,6 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
val firstGetBlockBodiesResponse = BlockBodies(firstBlocksBatch.map(_.body))
peersClient.reply(PeersClient.Response(fakePeer, firstGetBlockBodiesResponse))

// Trigger sync (to be improved with ETCM-248)
triggerFetching()

// Second headers request with response pending
Expand Down Expand Up @@ -162,7 +160,12 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
blockFetcher ! BlockFetcher.Start(importer.ref, 0)

peerEventBus.expectMsg(
Subscribe(MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers))
Subscribe(
MessageClassifier(
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
PeerSelector.AllPeers
)
)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ class RegularSyncSpec

"Regular Sync" when {
"initializing" should {
"subscribe for new blocks and new hashes" in sync(new Fixture(testSystem) {
"subscribe for new blocks, new hashes and new block headers" in sync(new Fixture(testSystem) {
regularSync ! SyncProtocol.Start

peerEventBus.expectMsg(
PeerEventBusActor.Subscribe(
MessageClassifier(Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code), PeerSelector.AllPeers)
MessageClassifier(
Set(NewBlock.code63, NewBlock.code64, NewBlockHashes.code, BlockHeaders.code),
PeerSelector.AllPeers
)
)
)
})
Expand Down