@@ -2,8 +2,8 @@ package io.iohk.ethereum.sync
22
33import java .net .{InetSocketAddress , ServerSocket }
44import java .nio .file .Files
5- import java .util .concurrent .TimeoutException
65import java .util .concurrent .atomic .AtomicReference
6+ import java .util .concurrent .{ThreadLocalRandom , TimeoutException }
77
88import akka .actor .{ActorRef , ActorSystem }
99import akka .testkit .TestProbe
@@ -13,6 +13,7 @@ import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
1313import io .iohk .ethereum .blockchain .sync .regular .BlockBroadcasterActor
1414import io .iohk .ethereum .blockchain .sync .regular .BlockBroadcasterActor .BroadcastBlock
1515import io .iohk .ethereum .blockchain .sync .{BlockBroadcast , BlockchainHostActor , FastSync , TestSyncConfig }
16+ import io .iohk .ethereum .crypto .kec256
1617import io .iohk .ethereum .db .components .{RocksDbDataSourceComponent , Storages }
1718import io .iohk .ethereum .db .dataSource .{RocksDbConfig , RocksDbDataSource }
1819import io .iohk .ethereum .db .storage .pruning .{ArchivePruning , PruningMode }
@@ -29,18 +30,11 @@ import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
2930import io .iohk .ethereum .network .p2p .messages .CommonMessages .NewBlock
3031import io .iohk .ethereum .network .rlpx .AuthHandshaker
3132import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .RLPxConfiguration
32- import io .iohk .ethereum .network .{
33- EtcPeerManagerActor ,
34- ForkResolver ,
35- KnownNodesManager ,
36- PeerEventBusActor ,
37- PeerManagerActor ,
38- ServerActor
39- }
33+ import io .iohk .ethereum .network .{EtcPeerManagerActor , ForkResolver , KnownNodesManager , PeerEventBusActor , PeerManagerActor , ServerActor }
4034import io .iohk .ethereum .nodebuilder .{PruningConfigBuilder , SecureRandomBuilder }
41- import io .iohk .ethereum .sync .FastSyncItSpec .{ FakePeer , IdentityUpdate , updateStateAtBlock }
35+ import io .iohk .ethereum .sync .FastSyncItSpec ._
4236import io .iohk .ethereum .utils .ServerStatus .Listening
43- import io .iohk .ethereum .utils .{ Config , NodeStatus , ServerStatus , VmConfig }
37+ import io .iohk .ethereum .utils ._
4438import io .iohk .ethereum .vm .EvmConfig
4539import io .iohk .ethereum .{Fixtures , FlatSpecBase , Timeouts }
4640import monix .eval .Task
@@ -79,14 +73,63 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
7973 _ <- peer1.waitForFastSyncFinish()
8074 } yield {
8175 val trie = peer1.getBestBlockTrie()
76+ val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000 , 500 )
8277 // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
8378 // state, so peer1 can get whole trie from both of them.
8479 assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
8580 assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
8681 assert(trie.isDefined)
82+ assert(synchronizingPeerHaveAllData)
8783 }
8884 }
8985
86+ it should " should sync blockchain with state nodes when peer do not response with full responses" in
87+ customTestCaseResourceM(FakePeer .start3FakePeersRes(
88+ fakePeerCustomConfig2 = FakePeerCustomConfig (HostConfig ()),
89+ fakePeerCustomConfig3 = FakePeerCustomConfig (HostConfig ()))) {
90+ case (peer1, peer2, peer3) =>
91+ for {
92+ _ <- peer2.importBlocksUntil(1000 )(updateStateAtBlock(500 ))
93+ _ <- peer3.importBlocksUntil(1000 )(updateStateAtBlock(500 ))
94+ _ <- peer1.connectToPeers(Set (peer2.node, peer3.node))
95+ _ <- peer1.startFastSync().delayExecution(50 .milliseconds)
96+ _ <- peer1.waitForFastSyncFinish()
97+ } yield {
98+ val trie = peer1.getBestBlockTrie()
99+ val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000 , 500 )
100+ // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
101+ // state, so peer1 can get whole trie from both of them.
102+ assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
103+ assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
104+ assert(trie.isDefined)
105+ assert(synchronizingPeerHaveAllData)
106+ }
107+ }
108+
109+ it should " should sync blockchain with state nodes when one of the peers send empty state responses" in
110+ customTestCaseResourceM(FakePeer .start3FakePeersRes(
111+ fakePeerCustomConfig2 = FakePeerCustomConfig (HostConfig ()),
112+ fakePeerCustomConfig3 = FakePeerCustomConfig (HostConfig ().copy(maxMptComponentsPerMessage = 0 )))) {
113+ case (peer1, peer2, peer3) =>
114+ for {
115+ _ <- peer2.importBlocksUntil(1000 )(updateStateAtBlock(500 ))
116+ _ <- peer3.importBlocksUntil(1000 )(updateStateAtBlock(500 ))
117+ _ <- peer1.connectToPeers(Set (peer2.node, peer3.node))
118+ _ <- peer1.startFastSync().delayExecution(50 .milliseconds)
119+ _ <- peer1.waitForFastSyncFinish()
120+ } yield {
121+ val trie = peer1.getBestBlockTrie()
122+ val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000 , 500 )
123+ // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same
124+ // state, so peer1 can get whole trie from both of them.
125+ assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
126+ assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset)
127+ assert(trie.isDefined)
128+ assert(synchronizingPeerHaveAllData)
129+ }
130+ }
131+
132+
90133 it should " should update target block" in customTestCaseResourceM(FakePeer .start2FakePeersRes()) {
91134 case (peer1, peer2) =>
92135 for {
@@ -99,11 +142,25 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
99142 assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
100143 }
101144 }
145+
146+ it should " should update target block and sync this new target block state" in customTestCaseResourceM(FakePeer .start2FakePeersRes()) {
147+ case (peer1, peer2) =>
148+ for {
149+ _ <- peer2.importBlocksUntil(1000 )(IdentityUpdate )
150+ _ <- peer1.connectToPeers(Set (peer2.node))
151+ _ <- peer2.importBlocksUntil(2000 )(updateStateAtBlock(1500 )).startAndForget
152+ _ <- peer1.startFastSync().delayExecution(50 .milliseconds)
153+ _ <- peer1.waitForFastSyncFinish()
154+ } yield {
155+ assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset)
156+ }
157+ }
158+
102159}
103160
104161object FastSyncItSpec {
105162 private def retryUntilWithDelay [A ](source : Task [A ], delay : FiniteDuration , maxRetries : Int )(
106- predicate : A => Boolean
163+ predicate : A => Boolean
107164 ): Task [A ] = {
108165 source.delayExecution(delay).flatMap { result =>
109166 if (predicate(result)) {
@@ -155,7 +212,29 @@ object FastSyncItSpec {
155212 }
156213 }
157214
158- class FakePeer (peerName : String ) extends SecureRandomBuilder with TestSyncConfig {
215+ case class HostConfig (
216+ maxBlocksHeadersPerMessage : Int ,
217+ maxBlocksBodiesPerMessage : Int ,
218+ maxReceiptsPerMessage : Int ,
219+ maxMptComponentsPerMessage : Int ) extends FastSyncHostConfiguration
220+
221+ object HostConfig {
222+ def apply (): HostConfig = {
223+ val random : ThreadLocalRandom = ThreadLocalRandom .current()
224+ new HostConfig (
225+ maxBlocksHeadersPerMessage = random.nextInt(100 , 201 ),
226+ maxBlocksBodiesPerMessage = random.nextInt(30 , 51 ),
227+ maxReceiptsPerMessage = random.nextInt(30 , 51 ),
228+ maxMptComponentsPerMessage = random.nextInt(100 , 201 )
229+ )
230+ }
231+ }
232+
233+ final case class FakePeerCustomConfig (hostConfig : HostConfig )
234+
235+ val defaultConfig = FakePeerCustomConfig (HostConfig (200 , 200 , 200 , 200 ))
236+
237+ class FakePeer (peerName : String , fakePeerCustomConfig : FakePeerCustomConfig ) extends SecureRandomBuilder with TestSyncConfig {
159238 implicit val akkaTimeout : Timeout = Timeout (5 .second)
160239
161240 val config = Config .config
@@ -231,10 +310,10 @@ object FastSyncItSpec {
231310
232311 val peerConf = new PeerConfiguration {
233312 override val fastSyncHostConfiguration : FastSyncHostConfiguration = new FastSyncHostConfiguration {
234- val maxBlocksHeadersPerMessage : Int = 200
235- val maxBlocksBodiesPerMessage : Int = 200
236- val maxReceiptsPerMessage : Int = 200
237- val maxMptComponentsPerMessage : Int = 200
313+ val maxBlocksHeadersPerMessage : Int = fakePeerCustomConfig.hostConfig.maxBlocksHeadersPerMessage
314+ val maxBlocksBodiesPerMessage : Int = fakePeerCustomConfig.hostConfig.maxBlocksBodiesPerMessage
315+ val maxReceiptsPerMessage : Int = fakePeerCustomConfig.hostConfig.maxReceiptsPerMessage
316+ val maxMptComponentsPerMessage : Int = fakePeerCustomConfig.hostConfig.maxMptComponentsPerMessage
238317 }
239318 override val rlpxConfiguration : RLPxConfiguration = new RLPxConfiguration {
240319 override val waitForTcpAckTimeout : FiniteDuration = Timeouts .normalTimeout
@@ -394,7 +473,7 @@ object FastSyncItSpec {
394473 }
395474
396475 private def createChildBlock (parent : Block , parentTd : BigInt , parentWorld : InMemoryWorldStateProxy )(
397- updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy
476+ updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy
398477 ): (Block , BigInt , InMemoryWorldStateProxy ) = {
399478 val newBlockNumber = parent.header.number + 1
400479 val newWorld = updateWorldForBlock(newBlockNumber, parentWorld)
@@ -406,8 +485,8 @@ object FastSyncItSpec {
406485 }
407486
408487 def importBlocksUntil (
409- n : BigInt
410- )(updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy ): Task [Unit ] = {
488+ n : BigInt
489+ )(updateWorldForBlock : (BigInt , InMemoryWorldStateProxy ) => InMemoryWorldStateProxy ): Task [Unit ] = {
411490 Task (bl.getBestBlock()).flatMap { block =>
412491 if (block.number >= n) {
413492 Task (())
@@ -445,36 +524,75 @@ object FastSyncItSpec {
445524 )
446525 }.toOption
447526 }
527+
528+ def containsExpectedDataUpToAccountAtBlock (n : BigInt , blockNumber : BigInt ): Boolean = {
529+ def go (i : BigInt ): Boolean = {
530+ if (i >= n) {
531+ true
532+ } else {
533+ val expectedBalance = i
534+ val accountAddress = Address (i)
535+ val accountExpectedCode = ByteString (i.toByteArray)
536+ val codeHash = kec256(accountExpectedCode)
537+ val accountExpectedStorageAddresses = (i until i + 20 ).toList
538+ val account = bl.getAccount(accountAddress, blockNumber).get
539+ val code = bl.getEvmCodeByHash(codeHash).get
540+ val storedData = accountExpectedStorageAddresses.map { addr =>
541+ ByteUtils .toBigInt(bl.getAccountStorageAt(account.storageRoot, addr, ethCompatibleStorage = true ))
542+ }
543+ val haveAllStoredData = accountExpectedStorageAddresses.zip(storedData).forall { case (address, value) =>
544+ address == value
545+ }
546+
547+ val dataIsCorrect = account.balance.toBigInt == expectedBalance && code == accountExpectedCode && haveAllStoredData
548+ if (dataIsCorrect) {
549+ go(i + 1 )
550+ } else {
551+ false
552+ }
553+ }
554+ }
555+
556+ go(0 )
557+ }
448558 }
449559
450560 object FakePeer {
451- def startFakePeer (peerName : String ): Task [FakePeer ] = {
561+ def startFakePeer (peerName : String , fakePeerCustomConfig : FakePeerCustomConfig ): Task [FakePeer ] = {
452562 for {
453- peer <- Task (new FakePeer (peerName))
563+ peer <- Task (new FakePeer (peerName, fakePeerCustomConfig ))
454564 _ <- peer.startPeer()
455565 } yield peer
456566 }
457567
458- def start1FakePeerRes (): Resource [Task , FakePeer ] = {
568+ def start1FakePeerRes (fakePeerCustomConfig : FakePeerCustomConfig = defaultConfig ): Resource [Task , FakePeer ] = {
459569 Resource .make {
460- startFakePeer(" Peer1" )
570+ startFakePeer(" Peer1" , fakePeerCustomConfig )
461571 } { peer =>
462572 peer.shutdown()
463573 }
464574 }
465575
466- def start2FakePeersRes () = {
576+ def start2FakePeersRes (fakePeerCustomConfig1 : FakePeerCustomConfig = defaultConfig,
577+ fakePeerCustomConfig2 : FakePeerCustomConfig = defaultConfig) = {
467578 Resource .make {
468- Task .parZip2(startFakePeer(" Peer1" ), startFakePeer(" Peer2" ))
579+ Task .parZip2(
580+ startFakePeer(" Peer1" , fakePeerCustomConfig1),
581+ startFakePeer(" Peer2" , fakePeerCustomConfig2))
469582 } { case (peer, peer1) => Task .parMap2(peer.shutdown(), peer1.shutdown())((_, _) => ()) }
470583 }
471584
472- def start3FakePeersRes () = {
585+ def start3FakePeersRes (fakePeerCustomConfig1 : FakePeerCustomConfig = defaultConfig,
586+ fakePeerCustomConfig2 : FakePeerCustomConfig = defaultConfig,
587+ fakePeerCustomConfig3 : FakePeerCustomConfig = defaultConfig) = {
473588 Resource .make {
474- Task .parZip3(startFakePeer(" Peer1" ), startFakePeer(" Peer2" ), startFakePeer(" Peer3" ))
589+ Task .parZip3(startFakePeer(" Peer1" , fakePeerCustomConfig1),
590+ startFakePeer(" Peer2" , fakePeerCustomConfig2),
591+ startFakePeer(" Peer3" , fakePeerCustomConfig3))
475592 } { case (peer, peer1, peer2) =>
476593 Task .parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_, _, _) => ())
477594 }
478595 }
479596 }
597+
480598}
0 commit comments