diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c809894c3b..93a777d361 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -97,7 +97,8 @@ object Dependencies { "org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0", "com.google.guava" % "guava" % "29.0-jre", "org.xerial.snappy" % "snappy-java" % "1.1.7.7", - "org.web3j" % "core" % "5.0.0" % Test + "org.web3j" % "core" % "5.0.0" % Test, + "io.vavr" % "vavr" % "1.0.0-alpha-3" ) val prometheus: Seq[ModuleID] = { diff --git a/repo.nix b/repo.nix index 1dba46e028..6961fba9af 100644 --- a/repo.nix +++ b/repo.nix @@ -1688,6 +1688,22 @@ url = "https://repo1.maven.org/maven2/io/suzaku/boopickle_2.12/1.3.3/boopickle_2.12-1.3.3.pom"; sha256 = "E88B339905B0C67211E08683A95AC7DB5185BF10BC7EC91378D95389D7CD5807"; }; + "nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-javadoc.jar" = { + url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-javadoc.jar"; + sha256 = "1F27BFD5D6187F5C57B699979ADE958C71D5A00A7C71B90ADA6F46A1FE0EFA78"; + }; + "nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-sources.jar" = { + url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3-sources.jar"; + sha256 = "CDE11815C879F2ED21437A59905F98C36F1BC4FFE1ACA2F7309A28EDEF60538A"; + }; + "nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.jar" = { + url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.jar"; + sha256 = "D68F2A25AF5BDD4D26B2D272304040F39CB5031D91A3A295F13B9DE9EF0946B0"; + }; + "nix-public/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.pom" = { + url = "https://repo1.maven.org/maven2/io/vavr/vavr/1.0.0-alpha-3/vavr-1.0.0-alpha-3.pom"; + sha256 = "CF7C124A2C9EA71543EFD99C6A5BB0BB31C44BDF266A28F819AD1509C825ED88"; + }; "nix-public/jline/jline/2.14.6/jline-2.14.6-javadoc.jar" = { url = "https://repo1.maven.org/maven2/jline/jline/2.14.6/jline-2.14.6-javadoc.jar"; sha256 = "EBD162363C0A6CA9E52AF51D7377F78F559D3F2663DC896E0DBF3B16A0188972"; diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index 79801c4edc..880668e7c9 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -1,55 +1,18 @@ package io.iohk.ethereum.sync import java.net.{InetSocketAddress, ServerSocket} -import java.nio.file.Files -import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ActorRef, ActorSystem} -import akka.testkit.TestProbe -import akka.util.{ByteString, Timeout} -import cats.effect.Resource -import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed -import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor -import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock -import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} -import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages} -import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} -import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} -import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} +import akka.util.ByteString +import io.iohk.ethereum.FlatSpecBase import io.iohk.ethereum.domain._ import io.iohk.ethereum.ledger.InMemoryWorldStateProxy -import io.iohk.ethereum.mpt.{HashNode, MerklePatriciaTrie, MptNode, MptTraversals} -import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo -import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration} -import io.iohk.ethereum.network.discovery.Node -import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo} -import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker} -import io.iohk.ethereum.network.p2p.EthereumMessageDecoder -import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock -import io.iohk.ethereum.network.rlpx.AuthHandshaker -import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration -import io.iohk.ethereum.network.{ - EtcPeerManagerActor, - ForkResolver, - KnownNodesManager, - PeerEventBusActor, - PeerManagerActor, - ServerActor -} -import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} -import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, updateStateAtBlock} -import io.iohk.ethereum.utils.ServerStatus.Listening -import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig} -import io.iohk.ethereum.vm.EvmConfig -import io.iohk.ethereum.{Fixtures, FlatSpecBase, Timeouts} -import monix.eval.Task +import io.iohk.ethereum.sync.FastSyncItSpec._ +import io.iohk.ethereum.sync.FastSyncItSpecUtils.{FakePeer, FakePeerCustomConfig, HostConfig} import monix.execution.Scheduler import org.scalatest.BeforeAndAfter import org.scalatest.matchers.should.Matchers import scala.concurrent.duration._ -import scala.util.Try class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { implicit val testScheduler = Scheduler.fixedPool("test", 16) @@ -79,14 +42,66 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { _ <- peer1.waitForFastSyncFinish() } yield { val trie = peer1.getBestBlockTrie() + val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same // state, so peer1 can get whole trie from both of them. assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset) assert(trie.isDefined) + assert(synchronizingPeerHaveAllData) } } + it should "should sync blockchain with state nodes when peer do not response with full responses" in + customTestCaseResourceM( + FakePeer.start3FakePeersRes( + fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()), + fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig()) + ) + ) { case (peer1, peer2, peer3) => + for { + _ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500)) + _ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500)) + _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) + _ <- peer1.startFastSync().delayExecution(50.milliseconds) + _ <- peer1.waitForFastSyncFinish() + } yield { + val trie = peer1.getBestBlockTrie() + val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) + // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same + // state, so peer1 can get whole trie from both of them. + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset) + assert(trie.isDefined) + assert(synchronizingPeerHaveAllData) + } + } + + it should "should sync blockchain with state nodes when one of the peers send empty state responses" in + customTestCaseResourceM( + FakePeer.start3FakePeersRes( + fakePeerCustomConfig2 = FakePeerCustomConfig(HostConfig()), + fakePeerCustomConfig3 = FakePeerCustomConfig(HostConfig().copy(maxMptComponentsPerMessage = 0)) + ) + ) { case (peer1, peer2, peer3) => + for { + _ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500)) + _ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500)) + _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) + _ <- peer1.startFastSync().delayExecution(50.milliseconds) + _ <- peer1.waitForFastSyncFinish() + } yield { + val trie = peer1.getBestBlockTrie() + val synchronizingPeerHaveAllData = peer1.containsExpectedDataUpToAccountAtBlock(1000, 500) + // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same + // state, so peer1 can get whole trie from both of them. + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.pivotBlockOffset) + assert(trie.isDefined) + assert(synchronizingPeerHaveAllData) + } + } + it should "should update pivot block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { case (peer1, peer2) => for { @@ -99,25 +114,24 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) } } -} -object FastSyncItSpec { - private def retryUntilWithDelay[A](source: Task[A], delay: FiniteDuration, maxRetries: Int)( - predicate: A => Boolean - ): Task[A] = { - source.delayExecution(delay).flatMap { result => - if (predicate(result)) { - Task.now(result) - } else { - if (maxRetries > 0) { - retryUntilWithDelay(source, delay, maxRetries - 1)(predicate) - } else { - Task.raiseError(new TimeoutException("Task time out after all retries")) - } - } + it should "should update pivot block and sync this new pivot block state" in customTestCaseResourceM( + FakePeer.start2FakePeersRes() + ) { case (peer1, peer2) => + for { + _ <- peer2.importBlocksUntil(1000)(IdentityUpdate) + _ <- peer1.connectToPeers(Set(peer2.node)) + _ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500)).startAndForget + _ <- peer1.startFastSync().delayExecution(50.milliseconds) + _ <- peer1.waitForFastSyncFinish() + } yield { + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) } } +} + +object FastSyncItSpec { def randomAddress(): InetSocketAddress = { val s = new ServerSocket(0) try { @@ -154,327 +168,4 @@ object FastSyncItSpec { IdentityUpdate(blockNr, world) } } - - class FakePeer(peerName: String) extends SecureRandomBuilder with TestSyncConfig { - implicit val akkaTimeout: Timeout = Timeout(5.second) - - val config = Config.config - - import scala.language.postfixOps - - implicit val system = ActorSystem(peerName) - - val peerDiscoveryManager = TestProbe().ref - - val nodeKey = io.iohk.ethereum.crypto.generateKeyPair(secureRandom) - - private val nodeStatus = - NodeStatus( - key = nodeKey, - serverStatus = ServerStatus.NotListening, - discoveryStatus = ServerStatus.NotListening - ) - - lazy val tempDir = Files.createTempDirectory("temp-fast-sync") - - def getRockDbTestConfig(dbPath: String) = { - new RocksDbConfig { - override val createIfMissing: Boolean = true - override val paranoidChecks: Boolean = false - override val path: String = dbPath - override val maxThreads: Int = 1 - override val maxOpenFiles: Int = 32 - override val verifyChecksums: Boolean = false - override val levelCompaction: Boolean = true - override val blockSize: Long = 16384 - override val blockCacheSize: Long = 33554432 - } - } - - sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { - override lazy val pruningMode: PruningMode = ArchivePruning - } - - lazy val nodeStatusHolder = new AtomicReference(nodeStatus) - lazy val storagesInstance = new RocksDbDataSourceComponent - with LocalPruningConfigBuilder - with Storages.DefaultStorages { - override lazy val dataSource: RocksDbDataSource = - RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq) - } - lazy val blockchainConfig = Config.blockchains.blockchainConfig - - /** - * Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when - * it is persisted in storage. - */ - lazy val knownNodesManagerConfig = - KnownNodesManager.KnownNodesManagerConfig(config).copy(persistInterval = 1.seconds) - - lazy val knownNodesManager = system.actorOf( - KnownNodesManager.props( - knownNodesManagerConfig, - storagesInstance.storages.knownNodesStorage - ) - ) - - val bl = BlockchainImpl(storagesInstance.storages) - - val genesis = Block( - Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash)), - Fixtures.Blocks.Genesis.body - ) - - bl.save(genesis, Seq(), genesis.header.difficulty, saveAsBestBlock = true) - - lazy val nh = nodeStatusHolder - - val peerConf = new PeerConfiguration { - override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration { - val maxBlocksHeadersPerMessage: Int = 200 - val maxBlocksBodiesPerMessage: Int = 200 - val maxReceiptsPerMessage: Int = 200 - val maxMptComponentsPerMessage: Int = 200 - } - override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration { - override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout - override val waitForHandshakeTimeout: FiniteDuration = Timeouts.normalTimeout - } - override val waitForHelloTimeout: FiniteDuration = 3 seconds - override val waitForStatusTimeout: FiniteDuration = 30 seconds - override val waitForChainCheckTimeout: FiniteDuration = 15 seconds - override val connectMaxRetries: Int = 3 - override val connectRetryDelay: FiniteDuration = 1 second - override val disconnectPoisonPillTimeout: FiniteDuration = 3 seconds - override val maxOutgoingPeers = 10 - override val maxIncomingPeers = 5 - override val maxPendingPeers = 5 - override val networkId: Int = 1 - - override val updateNodesInitialDelay: FiniteDuration = 5.seconds - override val updateNodesInterval: FiniteDuration = 20.seconds - override val shortBlacklistDuration: FiniteDuration = 1.minute - override val longBlacklistDuration: FiniteDuration = 3.minutes - } - - lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus") - - private val handshakerConfiguration: EtcHandshakerConfiguration = - new EtcHandshakerConfiguration { - override val forkResolverOpt: Option[ForkResolver] = None - override val nodeStatusHolder: AtomicReference[NodeStatus] = nh - override val peerConfiguration: PeerConfiguration = peerConf - override val blockchain: Blockchain = bl - override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage - } - - lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration) - - lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom) - - lazy val peerManager: ActorRef = system.actorOf( - PeerManagerActor.props( - peerDiscoveryManager, - Config.Network.peer, - peerEventBus, - knownNodesManager, - handshaker, - authHandshaker, - EthereumMessageDecoder - ), - "peer-manager" - ) - - lazy val etcPeerManager: ActorRef = system.actorOf( - EtcPeerManagerActor.props(peerManager, peerEventBus, storagesInstance.storages.appStateStorage, None), - "etc-peer-manager" - ) - - val blockchainHost: ActorRef = - system.actorOf(BlockchainHostActor.props(bl, peerConf, peerEventBus, etcPeerManager), "blockchain-host") - - lazy val server: ActorRef = system.actorOf(ServerActor.props(nodeStatusHolder, peerManager), "server") - - val listenAddress = randomAddress() - - lazy val node = - DiscoveryNodeInfo( - Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort), - 1 - ) - - lazy val vmConfig = VmConfig(Config.config) - - lazy val validators = new MockValidatorsAlwaysSucceed - - val testSyncConfig = syncConfig.copy( - minPeersToChoosePivotBlock = 1, - peersScanInterval = 5.milliseconds, - blockHeadersPerRequest = 200, - blockBodiesPerRequest = 50, - receiptsPerRequest = 50, - fastSyncThrottle = 10.milliseconds, - startRetryInterval = 50.milliseconds, - nodesPerRequest = 200, - maxTargetDifference = 1, - syncRetryInterval = 50.milliseconds - ) - - lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig) - - lazy val broadcasterActor = system.actorOf( - BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler) - ) - - lazy val fastSync = system.actorOf( - FastSync.props( - storagesInstance.storages.fastSyncStateStorage, - storagesInstance.storages.appStateStorage, - bl, - validators, - peerEventBus, - etcPeerManager, - testSyncConfig, - system.scheduler - ) - ) - - private def getMptForBlock(block: Block) = { - bl.getWorldStateProxy( - blockNumber = block.number, - accountStartNonce = blockchainConfig.accountStartNonce, - stateRootHash = Some(block.header.stateRoot), - noEmptyAccounts = EvmConfig.forBlock(block.number, blockchainConfig).noEmptyAccounts, - ethCompatibleStorage = blockchainConfig.ethCompatibleStorage - ) - } - - private def broadcastBlock(block: Block, td: BigInt) = { - broadcasterActor ! BroadcastBlock(NewBlock(block, td)) - } - - def getCurrentState(): BlockchainState = { - val bestBlock = bl.getBestBlock() - val currentWorldState = getMptForBlock(bestBlock) - val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get - BlockchainState(bestBlock, currentWorldState, currentTd) - } - - def startPeer(): Task[Unit] = { - for { - _ <- Task { - peerManager ! PeerManagerActor.StartConnecting - server ! ServerActor.StartServer(listenAddress) - } - _ <- retryUntilWithDelay(Task(nodeStatusHolder.get()), 1.second, 5) { status => - status.serverStatus == Listening(listenAddress) - } - } yield () - } - - def shutdown(): Task[Unit] = { - for { - _ <- Task.deferFuture(system.terminate()) - _ <- Task(storagesInstance.dataSource.destroy()) - } yield () - } - - def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = { - for { - _ <- Task { - peerManager ! DiscoveredNodesInfo(nodes) - } - _ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5) { - knownNodes => - val requestedNodes = nodes.map(_.node.id) - val currentNodes = knownNodes.map(Node.fromUri).map(_.id) - requestedNodes.subsetOf(currentNodes) - } - } yield () - } - - private def createChildBlock(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy)( - updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy - ): (Block, BigInt, InMemoryWorldStateProxy) = { - val newBlockNumber = parent.header.number + 1 - val newWorld = updateWorldForBlock(newBlockNumber, parentWorld) - val newBlock = parent.copy(header = - parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash) - ) - val newTd = newBlock.header.difficulty + parentTd - (newBlock, newTd, parentWorld) - } - - def importBlocksUntil( - n: BigInt - )(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { - Task(bl.getBestBlock()).flatMap { block => - if (block.number >= n) { - Task(()) - } else { - Task { - val currentTd = bl.getTotalDifficultyByHash(block.hash).get - val currentWolrd = getMptForBlock(block) - val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock) - bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) - bl.persistCachedNodes() - broadcastBlock(newBlock, newTd) - }.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock)) - } - } - } - - def startFastSync(): Task[Unit] = Task { - fastSync ! FastSync.Start - } - - def waitForFastSyncFinish(): Task[Boolean] = { - retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 90) { isDone => - isDone - } - } - - // Reads whole trie into memory, if the trie lacks nodes in storage it will be None - def getBestBlockTrie(): Option[MptNode] = { - Try { - val bestBlock = bl.getBestBlock() - val bestStateRoot = bestBlock.header.stateRoot - MptTraversals.parseTrieIntoMemory( - HashNode(bestStateRoot.toArray), - storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number) - ) - }.toOption - } - } - - object FakePeer { - def startFakePeer(peerName: String): Task[FakePeer] = { - for { - peer <- Task(new FakePeer(peerName)) - _ <- peer.startPeer() - } yield peer - } - - def start1FakePeerRes(): Resource[Task, FakePeer] = { - Resource.make { - startFakePeer("Peer1") - } { peer => - peer.shutdown() - } - } - - def start2FakePeersRes() = { - Resource.make { - Task.parZip2(startFakePeer("Peer1"), startFakePeer("Peer2")) - } { case (peer, peer1) => Task.parMap2(peer.shutdown(), peer1.shutdown())((_, _) => ()) } - } - - def start3FakePeersRes() = { - Resource.make { - Task.parZip3(startFakePeer("Peer1"), startFakePeer("Peer2"), startFakePeer("Peer3")) - } { case (peer, peer1, peer2) => - Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_, _, _) => ()) - } - } - } } diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpecUtils.scala new file mode 100644 index 0000000000..1d0f070cba --- /dev/null +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpecUtils.scala @@ -0,0 +1,465 @@ +package io.iohk.ethereum.sync + +import java.nio.file.Files +import java.util.concurrent.{ThreadLocalRandom, TimeoutException} +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.TestProbe +import akka.util.{ByteString, Timeout} +import cats.effect.Resource +import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed +import io.iohk.ethereum.{Fixtures, Timeouts} +import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} +import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor +import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock +import io.iohk.ethereum.crypto.kec256 +import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages} +import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} +import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} +import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} +import io.iohk.ethereum.domain.{Address, Block, Blockchain, BlockchainImpl} +import io.iohk.ethereum.ledger.InMemoryWorldStateProxy +import io.iohk.ethereum.mpt.{HashNode, MerklePatriciaTrie, MptNode, MptTraversals} +import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo +import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration} +import io.iohk.ethereum.network.{ + EtcPeerManagerActor, + ForkResolver, + KnownNodesManager, + PeerEventBusActor, + PeerManagerActor, + ServerActor +} +import io.iohk.ethereum.network.discovery.Node +import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo} +import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker} +import io.iohk.ethereum.network.p2p.EthereumMessageDecoder +import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock +import io.iohk.ethereum.network.rlpx.AuthHandshaker +import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration +import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} +import io.iohk.ethereum.sync.FastSyncItSpec.{BlockchainState, randomAddress} +import io.iohk.ethereum.sync.FastSyncItSpecUtils.FakePeerCustomConfig.defaultConfig +import io.iohk.ethereum.utils.ServerStatus.Listening +import io.iohk.ethereum.utils.{ByteUtils, Config, NodeStatus, ServerStatus, VmConfig} +import io.iohk.ethereum.vm.EvmConfig +import monix.eval.Task + +import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration +import scala.util.Try +import scala.concurrent.duration._ +object FastSyncItSpecUtils { + private def retryUntilWithDelay[A](source: Task[A], delay: FiniteDuration, maxRetries: Int)( + predicate: A => Boolean + ): Task[A] = { + source.delayExecution(delay).flatMap { result => + if (predicate(result)) { + Task.now(result) + } else { + if (maxRetries > 0) { + retryUntilWithDelay(source, delay, maxRetries - 1)(predicate) + } else { + Task.raiseError(new TimeoutException("Task time out after all retries")) + } + } + } + } + + case class HostConfig( + maxBlocksHeadersPerMessage: Int, + maxBlocksBodiesPerMessage: Int, + maxReceiptsPerMessage: Int, + maxMptComponentsPerMessage: Int + ) extends FastSyncHostConfiguration + + object HostConfig { + def apply(): HostConfig = { + val random: ThreadLocalRandom = ThreadLocalRandom.current() + new HostConfig( + maxBlocksHeadersPerMessage = random.nextInt(100, 201), + maxBlocksBodiesPerMessage = random.nextInt(30, 51), + maxReceiptsPerMessage = random.nextInt(30, 51), + maxMptComponentsPerMessage = random.nextInt(100, 201) + ) + } + } + + final case class FakePeerCustomConfig(hostConfig: HostConfig) + + object FakePeerCustomConfig { + val defaultConfig = FakePeerCustomConfig(HostConfig(200, 200, 200, 200)) + } + + class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig) + extends SecureRandomBuilder + with TestSyncConfig { + implicit val akkaTimeout: Timeout = Timeout(5.second) + + val config = Config.config + + import scala.language.postfixOps + + implicit val system = ActorSystem(peerName) + + val peerDiscoveryManager = TestProbe().ref + + val nodeKey = io.iohk.ethereum.crypto.generateKeyPair(secureRandom) + + private val nodeStatus = + NodeStatus( + key = nodeKey, + serverStatus = ServerStatus.NotListening, + discoveryStatus = ServerStatus.NotListening + ) + + lazy val tempDir = Files.createTempDirectory("temp-fast-sync") + + def getRockDbTestConfig(dbPath: String) = { + new RocksDbConfig { + override val createIfMissing: Boolean = true + override val paranoidChecks: Boolean = false + override val path: String = dbPath + override val maxThreads: Int = 1 + override val maxOpenFiles: Int = 32 + override val verifyChecksums: Boolean = false + override val levelCompaction: Boolean = true + override val blockSize: Long = 16384 + override val blockCacheSize: Long = 33554432 + } + } + + sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { + override lazy val pruningMode: PruningMode = ArchivePruning + } + + lazy val nodeStatusHolder = new AtomicReference(nodeStatus) + lazy val storagesInstance = new RocksDbDataSourceComponent + with LocalPruningConfigBuilder + with Storages.DefaultStorages { + override lazy val dataSource: RocksDbDataSource = + RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq) + } + lazy val blockchainConfig = Config.blockchains.blockchainConfig + + /** + * Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when + * it is persisted in storage. + */ + lazy val knownNodesManagerConfig = + KnownNodesManager.KnownNodesManagerConfig(config).copy(persistInterval = 1.seconds) + + lazy val knownNodesManager = system.actorOf( + KnownNodesManager.props( + knownNodesManagerConfig, + storagesInstance.storages.knownNodesStorage + ) + ) + + val bl = BlockchainImpl(storagesInstance.storages) + + val genesis = Block( + Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash)), + Fixtures.Blocks.Genesis.body + ) + + bl.save(genesis, Seq(), genesis.header.difficulty, saveAsBestBlock = true) + + lazy val nh = nodeStatusHolder + + val peerConf = new PeerConfiguration { + override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration { + val maxBlocksHeadersPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksHeadersPerMessage + val maxBlocksBodiesPerMessage: Int = fakePeerCustomConfig.hostConfig.maxBlocksBodiesPerMessage + val maxReceiptsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxReceiptsPerMessage + val maxMptComponentsPerMessage: Int = fakePeerCustomConfig.hostConfig.maxMptComponentsPerMessage + } + override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration { + override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout + override val waitForHandshakeTimeout: FiniteDuration = Timeouts.normalTimeout + } + override val waitForHelloTimeout: FiniteDuration = 3 seconds + override val waitForStatusTimeout: FiniteDuration = 30 seconds + override val waitForChainCheckTimeout: FiniteDuration = 15 seconds + override val connectMaxRetries: Int = 3 + override val connectRetryDelay: FiniteDuration = 1 second + override val disconnectPoisonPillTimeout: FiniteDuration = 3 seconds + override val maxOutgoingPeers = 10 + override val maxIncomingPeers = 5 + override val maxPendingPeers = 5 + override val networkId: Int = 1 + + override val updateNodesInitialDelay: FiniteDuration = 5.seconds + override val updateNodesInterval: FiniteDuration = 20.seconds + override val shortBlacklistDuration: FiniteDuration = 1.minute + override val longBlacklistDuration: FiniteDuration = 3.minutes + } + + lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus") + + private val handshakerConfiguration: EtcHandshakerConfiguration = + new EtcHandshakerConfiguration { + override val forkResolverOpt: Option[ForkResolver] = None + override val nodeStatusHolder: AtomicReference[NodeStatus] = nh + override val peerConfiguration: PeerConfiguration = peerConf + override val blockchain: Blockchain = bl + override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage + } + + lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration) + + lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom) + + lazy val peerManager: ActorRef = system.actorOf( + PeerManagerActor.props( + peerDiscoveryManager, + Config.Network.peer, + peerEventBus, + knownNodesManager, + handshaker, + authHandshaker, + EthereumMessageDecoder + ), + "peer-manager" + ) + + lazy val etcPeerManager: ActorRef = system.actorOf( + EtcPeerManagerActor.props(peerManager, peerEventBus, storagesInstance.storages.appStateStorage, None), + "etc-peer-manager" + ) + + val blockchainHost: ActorRef = + system.actorOf(BlockchainHostActor.props(bl, peerConf, peerEventBus, etcPeerManager), "blockchain-host") + + lazy val server: ActorRef = system.actorOf(ServerActor.props(nodeStatusHolder, peerManager), "server") + + val listenAddress = randomAddress() + + lazy val node = + DiscoveryNodeInfo( + Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort), + 1 + ) + + lazy val vmConfig = VmConfig(Config.config) + + lazy val validators = new MockValidatorsAlwaysSucceed + + val testSyncConfig = syncConfig.copy( + minPeersToChoosePivotBlock = 1, + peersScanInterval = 5.milliseconds, + blockHeadersPerRequest = 200, + blockBodiesPerRequest = 50, + receiptsPerRequest = 50, + fastSyncThrottle = 10.milliseconds, + startRetryInterval = 50.milliseconds, + nodesPerRequest = 200, + maxTargetDifference = 1, + syncRetryInterval = 50.milliseconds + ) + + lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig) + + lazy val broadcasterActor = system.actorOf( + BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler) + ) + + lazy val fastSync = system.actorOf( + FastSync.props( + storagesInstance.storages.fastSyncStateStorage, + storagesInstance.storages.appStateStorage, + bl, + validators, + peerEventBus, + etcPeerManager, + testSyncConfig, + system.scheduler + ) + ) + + private def getMptForBlock(block: Block) = { + bl.getWorldStateProxy( + blockNumber = block.number, + accountStartNonce = blockchainConfig.accountStartNonce, + stateRootHash = Some(block.header.stateRoot), + noEmptyAccounts = EvmConfig.forBlock(block.number, blockchainConfig).noEmptyAccounts, + ethCompatibleStorage = blockchainConfig.ethCompatibleStorage + ) + } + + private def broadcastBlock(block: Block, td: BigInt) = { + broadcasterActor ! BroadcastBlock(NewBlock(block, td)) + } + + def getCurrentState(): BlockchainState = { + val bestBlock = bl.getBestBlock() + val currentWorldState = getMptForBlock(bestBlock) + val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get + BlockchainState(bestBlock, currentWorldState, currentTd) + } + + def startPeer(): Task[Unit] = { + for { + _ <- Task { + peerManager ! PeerManagerActor.StartConnecting + server ! ServerActor.StartServer(listenAddress) + } + _ <- retryUntilWithDelay(Task(nodeStatusHolder.get()), 1.second, 5) { status => + status.serverStatus == Listening(listenAddress) + } + } yield () + } + + def shutdown(): Task[Unit] = { + for { + _ <- Task.deferFuture(system.terminate()) + _ <- Task(storagesInstance.dataSource.destroy()) + } yield () + } + + def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = { + for { + _ <- Task { + peerManager ! DiscoveredNodesInfo(nodes) + } + _ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5) { + knownNodes => + val requestedNodes = nodes.map(_.node.id) + val currentNodes = knownNodes.map(Node.fromUri).map(_.id) + requestedNodes.subsetOf(currentNodes) + } + } yield () + } + + private def createChildBlock(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy)( + updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy + ): (Block, BigInt, InMemoryWorldStateProxy) = { + val newBlockNumber = parent.header.number + 1 + val newWorld = updateWorldForBlock(newBlockNumber, parentWorld) + val newBlock = parent.copy(header = + parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash) + ) + val newTd = newBlock.header.difficulty + parentTd + (newBlock, newTd, parentWorld) + } + + def importBlocksUntil( + n: BigInt + )(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { + Task(bl.getBestBlock()).flatMap { block => + if (block.number >= n) { + Task(()) + } else { + Task { + val currentTd = bl.getTotalDifficultyByHash(block.hash).get + val currentWolrd = getMptForBlock(block) + val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock) + bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) + bl.persistCachedNodes() + broadcastBlock(newBlock, newTd) + }.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock)) + } + } + } + + def startFastSync(): Task[Unit] = Task { + fastSync ! FastSync.Start + } + + def waitForFastSyncFinish(): Task[Boolean] = { + retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 90) { isDone => + isDone + } + } + + // Reads whole trie into memory, if the trie lacks nodes in storage it will be None + def getBestBlockTrie(): Option[MptNode] = { + Try { + val bestBlock = bl.getBestBlock() + val bestStateRoot = bestBlock.header.stateRoot + MptTraversals.parseTrieIntoMemory( + HashNode(bestStateRoot.toArray), + storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number) + ) + }.toOption + } + + def containsExpectedDataUpToAccountAtBlock(n: BigInt, blockNumber: BigInt): Boolean = { + @tailrec + def go(i: BigInt): Boolean = { + if (i >= n) { + true + } else { + val expectedBalance = i + val accountAddress = Address(i) + val accountExpectedCode = ByteString(i.toByteArray) + val codeHash = kec256(accountExpectedCode) + val accountExpectedStorageAddresses = (i until i + 20).toList + val account = bl.getAccount(accountAddress, blockNumber).get + val code = bl.getEvmCodeByHash(codeHash).get + val storedData = accountExpectedStorageAddresses.map { addr => + ByteUtils.toBigInt(bl.getAccountStorageAt(account.storageRoot, addr, ethCompatibleStorage = true)) + } + val haveAllStoredData = accountExpectedStorageAddresses.zip(storedData).forall { case (address, value) => + address == value + } + + val dataIsCorrect = + account.balance.toBigInt == expectedBalance && code == accountExpectedCode && haveAllStoredData + if (dataIsCorrect) { + go(i + 1) + } else { + false + } + } + } + + go(0) + } + } + + object FakePeer { + + def startFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig): Task[FakePeer] = { + for { + peer <- Task(new FakePeer(peerName, fakePeerCustomConfig)) + _ <- peer.startPeer() + } yield peer + } + + def start1FakePeerRes( + fakePeerCustomConfig: FakePeerCustomConfig = defaultConfig, + name: String + ): Resource[Task, FakePeer] = { + Resource.make { + startFakePeer(name, fakePeerCustomConfig) + } { peer => + peer.shutdown() + } + } + + def start2FakePeersRes( + fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig, + fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig + ): Resource[Task, (FakePeer, FakePeer)] = { + for { + peer1 <- start1FakePeerRes(fakePeerCustomConfig1, "Peer1") + peer2 <- start1FakePeerRes(fakePeerCustomConfig2, "Peer2") + } yield (peer1, peer2) + } + + def start3FakePeersRes( + fakePeerCustomConfig1: FakePeerCustomConfig = defaultConfig, + fakePeerCustomConfig2: FakePeerCustomConfig = defaultConfig, + fakePeerCustomConfig3: FakePeerCustomConfig = defaultConfig + ): Resource[Task, (FakePeer, FakePeer, FakePeer)] = { + for { + peer1 <- start1FakePeerRes(fakePeerCustomConfig1, "Peer1") + peer2 <- start1FakePeerRes(fakePeerCustomConfig2, "Peer2") + peer3 <- start1FakePeerRes(fakePeerCustomConfig3, "Peer3") + } yield (peer1, peer2, peer3) + } + } + +} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 4815c598ec..3f7978ed86 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -329,7 +329,7 @@ mantis { # During fast-sync when most up to date block is determined from peers, the actual target block number # will be decreased by this value - pivot-block-offset = 128 + pivot-block-offset = 32 # How often to query peers for new blocks after the top of the chain has been reached check-for-new-block-interval = 10.seconds diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala index 9c4f9ba7e6..f3ca11c3cb 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala @@ -7,14 +7,13 @@ import akka.util.ByteString import io.iohk.ethereum.blockchain.sync.FastSyncReceiptsValidator.ReceiptsValidationResult import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived import io.iohk.ethereum.blockchain.sync.SyncBlocksValidator.BlockBodyValidationResult +import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{StartSyncingTo, StateSyncFinished} import io.iohk.ethereum.consensus.validators.Validators -import io.iohk.ethereum.crypto.kec256 import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage} import io.iohk.ethereum.domain._ -import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode} +import io.iohk.ethereum.mpt.MerklePatriciaTrie import io.iohk.ethereum.network.Peer import io.iohk.ethereum.network.p2p.messages.PV62._ -import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._ import io.iohk.ethereum.network.p2p.messages.PV63._ import io.iohk.ethereum.utils.ByteStringUtils import io.iohk.ethereum.utils.Config.SyncConfig @@ -23,7 +22,7 @@ import org.bouncycastle.util.encoders.Hex import scala.annotation.tailrec import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.{FiniteDuration, _} -import scala.util.{Failure, Random, Success, Try} +import scala.util.Random // scalastyle:off file.size.limit class FastSync( @@ -80,6 +79,11 @@ class FastSync( ) val syncingHandler = new SyncingHandler(syncState) context become syncingHandler.receive + if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { + log.info(s"Blockchain sync was completed, starting state sync to block ${syncState.pivotBlock.idTag}") + // chain has already been downloaded we can start state sync + syncingHandler.startStateSync(syncState.pivotBlock) + } syncingHandler.processSyncing() } } @@ -121,14 +125,22 @@ class FastSync( private var assignedHandlers: Map[ActorRef, Peer] = Map.empty private var peerRequestsTime: Map[Peer, Instant] = Map.empty - private var requestedMptNodes: Map[ActorRef, Seq[HashType]] = Map.empty - private var requestedNonMptNodes: Map[ActorRef, Seq[HashType]] = Map.empty private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty private val syncStateStorageActor = context.actorOf(Props[FastSyncStateStorageActor], "state-storage") + syncStateStorageActor ! fastSyncStateStorage + private val syncStateDownloader = context.actorOf( + SyncStateDownloaderActor.props(etcPeerManager, peerEventBus, syncConfig, scheduler), + "state-downloader" + ) + private val syncStateScheduler = context.actorOf( + SyncStateSchedulerActor.props(syncStateDownloader, SyncStateScheduler(blockchain)), + "state-scheduler" + ) + //Delay before starting to persist snapshot. It should be 0, as the presence of it marks that fast sync was started private val persistStateSnapshotDelay: FiniteDuration = 0.seconds private val syncStatePersistCancellable = @@ -138,11 +150,18 @@ class FastSync( private val heartBeat = scheduler.scheduleWithFixedDelay(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing) + def startStateSync(targetBlockHeader: BlockHeader): Unit = { + syncStateScheduler ! StartSyncingTo(targetBlockHeader.stateRoot, targetBlockHeader.number) + } + def receive: Receive = handleCommonMessages orElse { case UpdatePivotBlock(state) => updatePivotBlock(state) case ProcessSyncing => processSyncing() case PrintStatus => printStatus() case PersistSyncState => persistSyncState() + case StateSyncFinished => + syncState = syncState.copy(stateSyncFinished = true) + processSyncing() case ResponseReceived(peer, BlockHeaders(blockHeaders), timeTaken) => log.info("*** Received {} block headers in {} ms ***", blockHeaders.size, timeTaken) @@ -171,15 +190,6 @@ class FastSync( removeRequestHandler(sender()) handleReceipts(peer, requestedHashes, receipts) - case ResponseReceived(peer, nodeData: NodeData, timeTaken) => - log.info("Received {} state nodes in {} ms", nodeData.values.size, timeTaken) - val requestedHashes = - requestedMptNodes.getOrElse(sender(), Nil) ++ requestedNonMptNodes.getOrElse(sender(), Nil) - requestedMptNodes -= sender() - requestedNonMptNodes -= sender() - removeRequestHandler(sender()) - handleNodeData(peer, requestedHashes, nodeData) - case PeerRequestHandler.RequestFailed(peer, reason) => handleRequestFailure(peer, sender(), reason) @@ -206,17 +216,19 @@ class FastSync( } private def updatePivotBlock(state: FinalBlockProcessingResult): Unit = { - syncState = syncState.copy(updatingPivotBlock = true) if (syncState.pivotBlockUpdateFailures <= syncConfig.maximumTargetUpdateFailures) { - if (assignedHandlers.nonEmpty) { + if (assignedHandlers.nonEmpty || syncState.blockChainWorkQueued) { log.info(s"Still waiting for some responses, rescheduling pivot block update") - scheduler.scheduleOnce(syncRetryInterval, self, UpdatePivotBlock(state)) + scheduler.scheduleOnce(1.second, self, UpdatePivotBlock(state)) + processSyncing() } else { + syncState = syncState.copy(updatingPivotBlock = true) log.info("Asking for new pivot block") - val pivotBlockSelector = + val pivotBlockSelector = { context.actorOf( PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self) ) + } pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock context become waitingForPivotBlockUpdate(state) } @@ -228,13 +240,25 @@ class FastSync( private def updatePivotSyncState(state: FinalBlockProcessingResult, pivotBlockHeader: BlockHeader): Unit = state match { - case ImportedLastBlock - if pivotBlockHeader.number - syncState.pivotBlock.number <= syncConfig.maxTargetDifference => - log.info(s"Current pivot block is fresh enough, starting state download") - if (syncState.pivotBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) { - syncState = syncState.copy(pendingMptNodes = Seq()) + case ImportedLastBlock => + if (pivotBlockHeader.number - syncState.pivotBlock.number <= syncConfig.maxTargetDifference) { + log.info(s"Current pivot block is fresh enough, starting state download") + // Empty root has means that there were no transactions in blockchain, and Mpt trie is empty + // Asking for this root would result only with empty transactions + if (syncState.pivotBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) { + syncState = syncState.copy(stateSyncFinished = true) + } else { + syncStateScheduler ! StartSyncingTo(pivotBlockHeader.stateRoot, pivotBlockHeader.number) + } } else { - syncState = syncState.copy(pendingMptNodes = Seq(StateMptNodeHash(syncState.pivotBlock.stateRoot))) + syncState = syncState.updatePivotBlock( + pivotBlockHeader, + syncConfig.fastSyncBlockValidationX, + updateFailures = false + ) + log.info( + s"Changing pivot block to ${pivotBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}" + ) } case LastBlockValidationFailed => @@ -243,16 +267,6 @@ class FastSync( ) syncState = syncState.updatePivotBlock(pivotBlockHeader, syncConfig.fastSyncBlockValidationX, updateFailures = true) - - case _ => - log.info( - s"Changing pivot block to ${pivotBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}" - ) - syncState = syncState.updatePivotBlock( - pivotBlockHeader, - syncConfig.fastSyncBlockValidationX, - updateFailures = false - ) } private def removeRequestHandler(handler: ActorRef): Unit = { @@ -376,7 +390,7 @@ class FastSync( private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]) = { if (blockBodies.isEmpty) { val reason = - s"got empty block bodies response for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + s"got empty block bodies response for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" blacklist(peer.id, blacklistDuration, reason) syncState = syncState.enqueueBlockBodies(requestedHashes) } else { @@ -436,127 +450,13 @@ class FastSync( processSyncing() } - private def handleNodeData(peer: Peer, requestedHashes: Seq[HashType], nodeData: NodeData) = { - if (nodeData.values.isEmpty && requestedHashes.nonEmpty) { - log.info(s"got empty mpt node response for known hashes from peer ${peer.id}: ${requestedHashes - .map(h => Hex.toHexString(h.v.toArray[Byte]))}") - blacklist(peer.id, blacklistDuration, "empty mpt node response for known hashes") - } - - val receivedHashes = nodeData.values.map(v => ByteString(kec256(v.toArray[Byte]))) - val remainingHashes = requestedHashes.filterNot(h => receivedHashes.contains(h.v)) - if (remainingHashes.nonEmpty) { - syncState = syncState.addPendingNodes(remainingHashes) - } - - val hashesToRequest = (nodeData.values.indices zip receivedHashes) flatMap { case (idx, valueHash) => - requestedHashes.find(_.v == valueHash) map { - case _: StateMptNodeHash => - handleMptNode(nodeData.getMptNode(idx)) - - case _: ContractStorageMptNodeHash => - handleContractMptNode(nodeData.getMptNode(idx)) - - case EvmCodeHash(hash) => - val evmCode = nodeData.values(idx) - blockchain.storeEvmCode(hash, evmCode).commit() - Nil - - case StorageRootHash(_) => - val rootNode = nodeData.getMptNode(idx) - handleContractMptNode(rootNode) - } - } - - val flattenedHashes = hashesToRequest.flatten - val downloadedNodes = syncState.downloadedNodesCount + nodeData.values.size - val newKnownNodes = downloadedNodes + flattenedHashes.size - - syncState = syncState - .addPendingNodes(flattenedHashes) - .copy( - downloadedNodesCount = downloadedNodes, - totalNodesCount = newKnownNodes - ) - - processSyncing() - } - - private def handleMptNode(mptNode: MptNode): Seq[HashType] = mptNode match { - case n: LeafNode => - import AccountImplicits._ - //if this fails it means that we have leaf node which is part of MPT that do not stores account - //we verify if node is paert of the tree by checking its hash before we call handleMptNode() in line 44 - val account = Try(n.value.toArray[Byte].toAccount) match { - case Success(acc) => Some(acc) - case Failure(e) => - log.debug(s"Leaf node without account, error while trying to decode account ${e.getMessage}") - None - } - - val evm = account.map(_.codeHash) - val storage = account.map(_.storageRoot) - - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - - val evmRequests = evm - .filter(_ != Account.EmptyCodeHash) - .map(c => Seq(EvmCodeHash(c))) - .getOrElse(Nil) - - val storageRequests = storage - .filter(_ != Account.EmptyStorageRootHash) - .map(s => Seq(StorageRootHash(s))) - .getOrElse(Nil) - - evmRequests ++ storageRequests - - case n: BranchNode => - val hashes = n.children.collect { case HashNode(childHash) => childHash } - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - hashes.map(e => StateMptNodeHash(ByteString(e))) - - case n: ExtensionNode => - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - n.next match { - case HashNode(hashNode) => Seq(StateMptNodeHash(ByteString(hashNode))) - case _ => Nil - } - case _ => Nil - } - - private def handleContractMptNode(mptNode: MptNode): Seq[HashType] = { - mptNode match { - case n: LeafNode => - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - Nil - - case n: BranchNode => - val hashes = n.children.collect { case HashNode(childHash) => childHash } - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - hashes.map(e => ContractStorageMptNodeHash(ByteString(e))) - - case n: ExtensionNode => - blockchain.saveNode(ByteString(n.hash), n.toBytes, syncState.pivotBlock.number) - n.next match { - case HashNode(hashNode) => Seq(ContractStorageMptNodeHash(ByteString(hashNode))) - case _ => Nil - } - case _ => Nil - } - } - private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String) = { removeRequestHandler(handler) syncState = syncState - .addPendingNodes(requestedMptNodes.getOrElse(handler, Nil)) - .addPendingNodes(requestedNonMptNodes.getOrElse(handler, Nil)) .enqueueBlockBodies(requestedBlockBodies.getOrElse(handler, Nil)) .enqueueReceipts(requestedReceipts.getOrElse(handler, Nil)) - requestedMptNodes = requestedMptNodes - handler - requestedNonMptNodes = requestedNonMptNodes - handler requestedBlockBodies = requestedBlockBodies - handler requestedReceipts = requestedReceipts - handler @@ -581,8 +481,6 @@ class FastSync( private def persistSyncState(): Unit = { syncStateStorageActor ! syncState.copy( - pendingMptNodes = requestedMptNodes.values.flatten.toSeq.distinct ++ syncState.pendingMptNodes, - pendingNonMptNodes = requestedNonMptNodes.values.flatten.toSeq.distinct ++ syncState.pendingNonMptNodes, blockBodiesQueue = requestedBlockBodies.values.flatten.toSeq.distinct ++ syncState.blockBodiesQueue, receiptsQueue = requestedReceipts.values.flatten.toSeq.distinct ++ syncState.receiptsQueue ) @@ -623,8 +521,14 @@ class FastSync( if (fullySynced) { finish() } else { - if (anythingToDownload && !syncState.updatingPivotBlock) processDownloads() - else log.info("No more items to request, waiting for {} responses", assignedHandlers.size) + if (blockchainDataToDownload) { + processDownloads() + } else if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { + // TODO ETCM-103 we are waiting for state sync to finish, we should probably cancel this loop, or look only + // for new target block + } else { + log.info("No more items to request, waiting for {} responses", assignedHandlers.size) + } } } @@ -662,15 +566,7 @@ class FastSync( .take(maxConcurrentRequests - assignedHandlers.size) .toSeq .sortBy(_.ref.toString()) - .foreach(assignWork) - } - } - - def assignWork(peer: Peer): Unit = { - if (syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget || syncState.blockChainWorkQueued) { - assignBlockchainWork(peer) - } else { - requestNodes(peer) + .foreach(assignBlockchainWork) } } @@ -755,44 +651,13 @@ class FastSync( peerRequestsTime += (peer -> Instant.now()) } - def requestNodes(peer: Peer): Unit = { - if (syncState.pendingNonMptNodes.nonEmpty || syncState.pendingMptNodes.nonEmpty) { - val (nonMptNodesToGet, remainingNonMptNodes) = syncState.pendingNonMptNodes.splitAt(nodesPerRequest) - val (mptNodesToGet, remainingMptNodes) = - syncState.pendingMptNodes.splitAt(nodesPerRequest - nonMptNodesToGet.size) - val nodesToGet = nonMptNodesToGet ++ mptNodesToGet - log.info(s"Request ${nodesToGet.size} nodes from peer ${peer.id}") - val handler = context.actorOf( - PeerRequestHandler.props[GetNodeData, NodeData]( - peer, - peerResponseTimeout, - etcPeerManager, - peerEventBus, - requestMsg = GetNodeData(nodesToGet.map(_.v)), - responseMsgCode = NodeData.code - ) - ) - - context watch handler - assignedHandlers += (handler -> peer) - peerRequestsTime += (peer -> Instant.now()) - syncState = syncState.copy(pendingNonMptNodes = remainingNonMptNodes, pendingMptNodes = remainingMptNodes) - requestedMptNodes += handler -> mptNodesToGet - requestedNonMptNodes += handler -> nonMptNodesToGet - } else { - log.debug("There is node work to assign for peer") - } - } - def unassignedPeers: Set[Peer] = peersToDownloadFrom.keySet diff assignedHandlers.values.toSet - def anythingToDownload: Boolean = - syncState.anythingQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget + def blockchainDataToDownload: Boolean = + syncState.blockChainWorkQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget def fullySynced: Boolean = { - syncState.bestBlockHeaderNumber >= syncState.safeDownloadTarget && - !syncState.anythingQueued && - assignedHandlers.isEmpty + syncState.isBlockchainWorkFinished && assignedHandlers.isEmpty && syncState.stateSyncFinished } } @@ -842,14 +707,14 @@ object FastSync { private case class UpdatePivotBlock(state: FinalBlockProcessingResult) private case object ProcessSyncing + private[sync] case object PersistSyncState + private case object PrintStatus case class SyncState( pivotBlock: BlockHeader, safeDownloadTarget: BigInt = 0, - pendingMptNodes: Seq[HashType] = Nil, - pendingNonMptNodes: Seq[HashType] = Nil, blockBodiesQueue: Seq[ByteString] = Nil, receiptsQueue: Seq[ByteString] = Nil, downloadedNodesCount: Int = 0, @@ -857,7 +722,8 @@ object FastSync { bestBlockHeaderNumber: BigInt = 0, nextBlockToFullyValidate: BigInt = 1, pivotBlockUpdateFailures: Int = 0, - updatingPivotBlock: Boolean = false + updatingPivotBlock: Boolean = false, + stateSyncFinished: Boolean = false ) { def enqueueBlockBodies(blockBodies: Seq[ByteString]): SyncState = @@ -866,21 +732,6 @@ object FastSync { def enqueueReceipts(receipts: Seq[ByteString]): SyncState = copy(receiptsQueue = receiptsQueue ++ receipts) - def addPendingNodes(hashes: Seq[HashType]): SyncState = { - val (mpt, nonMpt) = hashes.partition { - case _: StateMptNodeHash | _: ContractStorageMptNodeHash => true - case _: EvmCodeHash | _: StorageRootHash => false - } - // Nodes are prepended in order to traverse mpt in-depth. For mpt nodes is not needed but to keep it consistent, - // it was applied too - copy(pendingMptNodes = mpt ++ pendingMptNodes, pendingNonMptNodes = nonMpt ++ pendingNonMptNodes) - } - - def anythingQueued: Boolean = - pendingNonMptNodes.nonEmpty || - pendingMptNodes.nonEmpty || - blockChainWorkQueued - def blockChainWorkQueued: Boolean = blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty def updateNextBlockToValidate(header: BlockHeader, K: Int, X: Int): SyncState = copy( @@ -904,6 +755,10 @@ object FastSync { safeDownloadTarget = newPivot.number + numberOfSafeBlocks, pivotBlockUpdateFailures = if (updateFailures) pivotBlockUpdateFailures + 1 else pivotBlockUpdateFailures ) + + def isBlockchainWorkFinished: Boolean = { + bestBlockHeaderNumber >= safeDownloadTarget && !blockChainWorkQueued + } } sealed trait HashType { @@ -919,12 +774,17 @@ object FastSync { case object Done sealed abstract class HeaderProcessingResult + case object HeadersProcessingFinished extends HeaderProcessingResult + case class ParentDifficultyNotFound(header: BlockHeader) extends HeaderProcessingResult + case class ValidationFailed(header: BlockHeader, peer: Peer) extends HeaderProcessingResult + case object ImportedPivotBlock extends HeaderProcessingResult sealed abstract class FinalBlockProcessingResult + case object ImportedLastBlock extends FinalBlockProcessingResult case object LastBlockValidationFailed extends FinalBlockProcessingResult } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderActor.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderActor.scala new file mode 100644 index 0000000000..3e6dc90376 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderActor.scala @@ -0,0 +1,322 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler, Timers} +import akka.util.ByteString +import cats.data.NonEmptyList +import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived +import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor._ +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.SyncResponse +import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{GetMissingNodes, MissingNodes} +import io.iohk.ethereum.crypto.kec256 +import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, NodeData} +import io.iohk.ethereum.network.{Peer, PeerId} +import io.iohk.ethereum.utils.Config.SyncConfig + +import scala.annotation.tailrec + +/** + * SyncStateDownloaderActor receives missing nodes from scheduler and makes sure that those nodes would be eventually retrieved. + * It never ask ask two peers for the same nodes. + * Another design choice would be to allow for duplicate node retrieval and ignore duplicates at scheduler level, but to do that + * SyncStateDownloaderActor would need to keep track which peer was already asked for which node. + */ +class SyncStateDownloaderActor( + val etcPeerManager: ActorRef, + val peerEventBus: ActorRef, + val syncConfig: SyncConfig, + implicit val scheduler: Scheduler +) extends Actor + with ActorLogging + with BlacklistSupport + with PeerListSupport + with Timers { + + private def requestNodes(request: PeerRequest): ActorRef = { + log.info(s"Requesting ${request.nodes.size} from peer ${request.peer}") + val handler = context.actorOf( + PeerRequestHandler.props[GetNodeData, NodeData]( + request.peer, + syncConfig.peerResponseTimeout, + etcPeerManager, + peerEventBus, + requestMsg = GetNodeData(request.nodes.toList), + responseMsgCode = NodeData.code + ) + ) + context.watchWith(handler, RequestTerminated(request.peer)) + } + + private def getFreePeers(state: DownloaderState) = { + handshakedPeers.collect { + case (peer, _) if !state.activeRequests.contains(peer.id) && !isBlacklisted(peer.id) => peer + } + } + + def checkPeerAvailabilityIfSomeDataQueued(state: DownloaderState): Unit = { + if (state.nodesToGet.nonEmpty) { + self ! GetMissingNodes(List()) + } + } + + def handleRequestResults(scheduler: ActorRef, currentState: DownloaderState): Receive = { + case ResponseReceived(peer, nodeData: NodeData, timeTaken) => + log.info("Received {} state nodes in {} ms", nodeData.values.size, timeTaken) + context unwatch (sender()) + currentState.handleRequestSuccess(peer, nodeData) match { + case (UnrequestedResponse, newState) => + log.debug("Received unexpected response from {}", peer.id) + // just ignore unrequested stuff + context.become(downloading(scheduler, newState)) + case (NoUsefulDataInResponse, newState) => + log.debug("Received no useful data from peer {}, blacklisting", peer) + blacklist(peer.id, syncConfig.blacklistDuration, "Empty response") + checkPeerAvailabilityIfSomeDataQueued(newState) + context.become(downloading(scheduler, newState)) + case (UsefulData(responsesToProcess), newState) => + log.info("Received {} responses from peer {}", responsesToProcess.size, peer.id) + val currentCapacity = + ((getFreePeers(newState).size * syncConfig.nodesPerRequest) - newState.nodesToGet.size).max(0) + scheduler ! MissingNodes(responsesToProcess, currentCapacity) + // we got free peer lets re-schedule task assignment + checkPeerAvailabilityIfSomeDataQueued(newState) + context.become(downloading(scheduler, newState)) + } + + case PeerRequestHandler.RequestFailed(peer, reason) => + context unwatch (sender()) + log.debug(s"Request failed to peer {} due to {}", peer.id, reason) + checkPeerAvailabilityIfSomeDataQueued(currentState) + if (handshakedPeers.contains(peer)) { + blacklist(peer.id, syncConfig.blacklistDuration, reason) + } + context.become(downloading(scheduler, currentState.handleRequestFailure(peer))) + + case RequestTerminated(peer) => + log.debug(s"Request to {} terminated", peer.id) + checkPeerAvailabilityIfSomeDataQueued(currentState) + context.become(downloading(scheduler, currentState.handleRequestFailure(peer))) + } + + def idle: Receive = { case RegisterScheduler => + log.debug("Scheduler registered, starting sync download") + context.become(downloading(sender(), DownloaderState(Map.empty, Map.empty))) + } + + def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages + + def downloading(scheduler: ActorRef, currentState: DownloaderState): Receive = + handleRequestResults(scheduler, currentState) orElse + handleCommonMessages orElse { case GetMissingNodes(newNodesToGet) => + val freePeers = getFreePeers(currentState) + if (freePeers.isEmpty) { + log.info("No available peer, rescheduling request for retrieval") + timers.startSingleTimer(CheckForPeersKey, GetMissingNodes(List.empty), syncConfig.syncRetryInterval) + context.become(downloading(scheduler, currentState.scheduleNewNodesForRetrieval(newNodesToGet))) + } else if (newNodesToGet.isEmpty && currentState.nodesToGet.isEmpty) { + log.info("No available work, waiting for additional requests") + } else { + val nodesToGet = if (newNodesToGet.isEmpty) None else Some(newNodesToGet) + val (newRequests, newState) = + currentState.assignTasksToPeers( + NonEmptyList.fromListUnsafe(freePeers.toList), + nodesToGet, + syncConfig.nodesPerRequest + ) + log.info( + "Creating {} new state node requests. Current request queue size is {}", + newRequests.size, + newState.nodesToGet.size + ) + newRequests.foreach { request => + requestNodes(request) + } + context.become(downloading(scheduler, newState)) + } + } + + override def receive: Receive = idle +} + +object SyncStateDownloaderActor { + def props(etcPeerManager: ActorRef, peerEventBus: ActorRef, syncConfig: SyncConfig, scheduler: Scheduler): Props = { + Props(new SyncStateDownloaderActor(etcPeerManager, peerEventBus, syncConfig, scheduler)) + } + + final case object CheckForPeersKey + + final case class RequestTerminated(to: Peer) + + final case class PeerRequest(peer: Peer, nodes: NonEmptyList[ByteString]) + + final case object RegisterScheduler + + sealed trait ResponseProcessingResult + + case object UnrequestedResponse extends ResponseProcessingResult + + case object NoUsefulDataInResponse extends ResponseProcessingResult + + case class UsefulData(responses: List[SyncResponse]) extends ResponseProcessingResult + + final case class DownloaderState( + activeRequests: Map[PeerId, NonEmptyList[ByteString]], + nodesToGet: Map[ByteString, Option[PeerId]] + ) { + lazy val nonDownloadedNodes = nodesToGet.collect { + case (hash, maybePeer) if maybePeer.isEmpty => hash + }.toSeq + + def scheduleNewNodesForRetrieval(nodes: Seq[ByteString]): DownloaderState = { + val newNodesToGet = nodes.foldLeft(nodesToGet) { case (map, node) => + if (map.contains(node)) { + map + } else { + map + (node -> None) + } + } + + copy(nodesToGet = newNodesToGet) + } + + private def addActiveRequest(peerRequest: PeerRequest): DownloaderState = { + val newNodesToget = peerRequest.nodes.foldLeft(nodesToGet) { case (map, node) => + map + (node -> Some(peerRequest.peer.id)) + } + + copy(activeRequests = activeRequests + (peerRequest.peer.id -> peerRequest.nodes), nodesToGet = newNodesToget) + } + + def handleRequestFailure(from: Peer): DownloaderState = { + val requestedNodes = activeRequests(from.id) + val newNodesToGet = requestedNodes.foldLeft(nodesToGet) { case (map, node) => + map + (node -> None) + } + + copy(activeRequests = activeRequests - from.id, nodesToGet = newNodesToGet) + } + + /** + * Responses from peers should be delivered in order, but can contain gaps or can be not full, so we cannot fail + * on first not matching response. + * Matched responses are returned in correct order, the hashes to be rescheduled are returned in no particular order + * as they will either way end up in map of hashes to be re-downloaded + */ + def process( + requested: NonEmptyList[ByteString], + received: NonEmptyList[ByteString] + ): (List[ByteString], List[SyncResponse]) = { + @tailrec + def go( + remainingRequestedHashes: List[ByteString], + nextResponse: SyncResponse, + remainingResponses: List[ByteString], + nonReceivedRequested: List[ByteString], + processed: List[SyncResponse] + ): (List[ByteString], List[SyncResponse]) = { + if (remainingRequestedHashes.isEmpty) { + (nonReceivedRequested, processed.reverse) + } else { + val nextRequestedHash = remainingRequestedHashes.head + if (nextRequestedHash == nextResponse.hash) { + if (remainingResponses.isEmpty) { + val finalNonReceived = remainingRequestedHashes.tail ::: nonReceivedRequested + val finalProcessed = nextResponse :: processed + (finalNonReceived, finalProcessed.reverse) + } else { + val nexExpectedResponse = SyncResponse(kec256(remainingResponses.head), remainingResponses.head) + go( + remainingRequestedHashes.tail, + nexExpectedResponse, + remainingResponses.tail, + nonReceivedRequested, + nextResponse :: processed + ) + } + } else { + go( + remainingRequestedHashes.tail, + nextResponse, + remainingResponses, + nextRequestedHash :: nonReceivedRequested, + processed + ) + } + } + } + + val firstReceivedResponse = SyncResponse(kec256(received.head), received.head) + + go(requested.toList, firstReceivedResponse, received.tail, List.empty, List.empty) + } + + def handleRequestSuccess(from: Peer, receivedMessage: NodeData): (ResponseProcessingResult, DownloaderState) = { + activeRequests + .get(from.id) + .map { requestedHashes => + if (receivedMessage.values.isEmpty) { + val rescheduleRequestedHashes = requestedHashes.foldLeft(nodesToGet) { case (map, hash) => + map + (hash -> None) + } + ( + NoUsefulDataInResponse, + copy(activeRequests = activeRequests - from.id, nodesToGet = rescheduleRequestedHashes) + ) + } else { + val (notReceived, received) = + process(requestedHashes, NonEmptyList.fromListUnsafe(receivedMessage.values.toList)) + if (received.isEmpty) { + val rescheduleRequestedHashes = notReceived.foldLeft(nodesToGet) { case (map, hash) => + map + (hash -> None) + } + ( + NoUsefulDataInResponse, + copy(activeRequests = activeRequests - from.id, nodesToGet = rescheduleRequestedHashes) + ) + } else { + val afterNotReceive = notReceived.foldLeft(nodesToGet) { case (map, hash) => map + (hash -> None) } + val afterReceived = received.foldLeft(afterNotReceive) { case (map, received) => map - received.hash } + (UsefulData(received), copy(activeRequests = activeRequests - from.id, nodesToGet = afterReceived)) + } + } + } + .getOrElse((UnrequestedResponse, this)) + } + + def assignTasksToPeers( + peers: NonEmptyList[Peer], + newNodes: Option[Seq[ByteString]], + nodesPerPeerCapacity: Int + ): (Seq[PeerRequest], DownloaderState) = { + @tailrec + def go( + peersRemaining: List[Peer], + nodesRemaining: Seq[ByteString], + createdRequests: List[PeerRequest], + currentState: DownloaderState + ): (Seq[PeerRequest], DownloaderState) = { + if (peersRemaining.isEmpty || nodesRemaining.isEmpty) { + (createdRequests.reverse, currentState.scheduleNewNodesForRetrieval(nodesRemaining)) + } else { + val nextPeer = peersRemaining.head + val (nodes, nodesAfterAssignment) = nodesRemaining.splitAt(nodesPerPeerCapacity) + val peerRequest = PeerRequest(nextPeer, NonEmptyList.fromListUnsafe(nodes.toList)) + go( + peersRemaining.tail, + nodesAfterAssignment, + peerRequest :: createdRequests, + currentState.addActiveRequest(peerRequest) + ) + } + } + + val currentNodesToDeliver = newNodes.map(nodes => nonDownloadedNodes ++ nodes).getOrElse(nonDownloadedNodes) + if (currentNodesToDeliver.isEmpty) { + (Seq(), this) + } else { + go(peers.toList, currentNodesToDeliver, List.empty, this) + } + } + + } + +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala new file mode 100644 index 0000000000..77ad4d9d1a --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala @@ -0,0 +1,451 @@ +package io.iohk.ethereum.blockchain.sync + +import java.util.Comparator + +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler._ +import io.iohk.ethereum.domain.{Account, Blockchain} +import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode} +import io.vavr.collection.PriorityQueue +import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._ + +import scala.annotation.tailrec +import scala.util.Try + +/** + * Scheduler which traverses Merkle patricia trie in DFS fashion, while also creating requests for nodes missing in traversed + * trie. + * Traversal example: Merkle Patricia Trie with 2 leaf child nodes, each with non empty code value. + * Final State: + * BranchNode(hash: 1) + * / \ + * Leaf(hash:2, codeHash:3) Leaf(hash:4, codeHash:5) + * + * InitialState: + * At initial state there is only: (hash: 1) + * + * Traversal in node by node fashion: + * 1. Ask for root. After receive: (NodesToGet:[Hash:2, Hash4], nodesToSave: []) + * 2. Ask for (Hash:2). After receive: (NodesToGet:[CodeHash:3, Hash4], nodesToSave: []) + * 3. Ask for (CodeHash:3). After receive: (NodesToGet:[Hash:4], nodesToSave: [Leaf(hash:2, codeHash:3)]) + * 4. Ask for (Hash:4). After receive: (NodesToGet:[codeHash:5], nodesToSave: [Leaf(hash:2, codeHash:3)]) + * 5. Ask for (CodeHash:5).After receive: + * (NodesToGet:[], nodesToSave: [Leaf(hash:2, codeHash:3)], Leaf(hash:4, codeHash:5), BranchNode(hash: 1)) + * + * BranchNode is only committed to save when all of its leaf nodes are retrieved, and all children of those leaf nodes i.e + * storage and code are retrieved. + * + * SyncStateScheduler is agnostic to the way how SchedulerState is handled, it can be kept in var in actor, or in cats.Ref. + * + * Important part is that nodes retrieved by getMissingNodes, must eventually be provided for scheduler to make progress + */ +class SyncStateScheduler(blockchain: Blockchain) { + + def initState(targetRootHash: ByteString): Option[SchedulerState] = { + if (targetRootHash == emptyStateRootHash) { + None + } else if (blockchain.getMptNodeByHash(targetRootHash).isDefined) { + None + } else { + val initialState = SchedulerState() + val initialRequest = StateNodeRequest(targetRootHash, None, StateNode, Seq(), 0, 0) + Option(initialState.schedule(initialRequest)) + } + } + + /** + * Default responses processor which ignores duplicated or not requested hashes, but informs the caller about critical + * errors. + * If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data. + */ + def processResponses( + state: SchedulerState, + responses: List[SyncResponse] + ): Either[CriticalError, (SchedulerState, ProcessingStatistics)] = { + @tailrec + def go( + currentState: SchedulerState, + currentStatistics: ProcessingStatistics, + remaining: Seq[SyncResponse] + ): Either[CriticalError, (SchedulerState, ProcessingStatistics)] = { + if (remaining.isEmpty) { + Right((currentState, currentStatistics)) + } else { + val responseToProcess = remaining.head + processResponse(currentState, responseToProcess) match { + case Left(value) => + value match { + case error: CriticalError => + Left(error) + case err: NotCriticalError => + err match { + case SyncStateScheduler.NotRequestedItem => + go(currentState, currentStatistics.addNotRequested(), remaining.tail) + case SyncStateScheduler.AlreadyProcessedItem => + go(currentState, currentStatistics.addDuplicated(), remaining.tail) + } + } + + case Right(newState) => + go(newState, currentStatistics, remaining.tail) + } + } + } + + go(state, ProcessingStatistics(), responses) + } + + def getMissingNodes(state: SchedulerState, max: Int): (List[ByteString], SchedulerState) = { + state.getMissingHashes(max) + } + + def getAllMissingNodes(state: SchedulerState): (List[ByteString], SchedulerState) = { + getMissingNodes(state, state.numberOfMissingHashes) + } + + def persistBatch(state: SchedulerState, targetBlockNumber: BigInt): SchedulerState = { + val (nodes, newState) = state.getNodesToPersist + nodes.foreach { case (hash, (data, reqType)) => + reqType match { + case _: CodeRequest => + blockchain.storeEvmCode(hash, data).commit() + case _: NodeRequest => + blockchain.saveNode(hash, data.toArray, targetBlockNumber) + } + } + newState + } + + private def isRequestAlreadyKnownOrResolved( + state: SchedulerState, + response: SyncResponse + ): Either[ResponseProcessingError, StateNodeRequest] = { + for { + activeRequest <- state.getPendingRequestByHash(response.hash).toRight(NotRequestedItem) + _ <- if (activeRequest.resolvedData.isDefined) Left(AlreadyProcessedItem) else Right(()) + } yield activeRequest + } + + private def processActiveResponse( + state: SchedulerState, + activeRequest: StateNodeRequest, + response: SyncResponse + ): Either[ResponseProcessingError, SchedulerState] = { + activeRequest.requestType match { + case _: CodeRequest => Right(state.commit(activeRequest.copy(resolvedData = Some(response.data)))) + case requestType: NodeRequest => + for { + mptNode <- Try(response.data.toArray.toMptNode).toEither.left.map(_ => CannotDecodeMptNode) + possibleChildRequests <- createPossibleChildRequests(mptNode, activeRequest, requestType) + } yield { + val childWithoutAlreadyKnown = + possibleChildRequests.filterNot(req => isRequestedHashAlreadyCommitted(state, req)) + if (childWithoutAlreadyKnown.isEmpty && activeRequest.dependencies == 0) { + state.commit(activeRequest.copy(resolvedData = Some(response.data))) + } else { + state.resolveRequest(activeRequest, response.data, childWithoutAlreadyKnown) + } + } + } + } + + def processResponse( + state: SchedulerState, + response: SyncResponse + ): Either[ResponseProcessingError, SchedulerState] = { + for { + activeRequest <- isRequestAlreadyKnownOrResolved(state, response) + newState <- processActiveResponse(state, activeRequest, response) + } yield newState + + } + // scalastyle:off method.length + private def createPossibleChildRequests( + mptNode: MptNode, + parentRequest: StateNodeRequest, + requestType: NodeRequest + ): Either[NotAccountLeafNode.type, Seq[StateNodeRequest]] = mptNode match { + case n: LeafNode => + requestType match { + case SyncStateScheduler.StateNode => + Account(n.value).toEither.left.map(_ => NotAccountLeafNode).map { account => + // We are scheduling both storage trie and code requests with highest priority to be sure that leaf nodes completed + // as fast as possible + val evmRequests = if (account.codeHash != emptyCodeHash) { + Seq(StateNodeRequest(account.codeHash, None, Code, Seq(parentRequest.nodeHash), maxMptTrieDepth, 0)) + } else { + Seq() + } + + val storageRequests = if (account.storageRoot != emptyStateRootHash) { + Seq( + StateNodeRequest( + account.storageRoot, + None, + StorageNode, + Seq(parentRequest.nodeHash), + maxMptTrieDepth, + 0 + ) + ) + } else { + Seq() + } + + evmRequests ++ storageRequests + } + + case SyncStateScheduler.StorageNode => + Right(Seq()) + } + + case n: BranchNode => + Right(n.children.collect { case HashNode(childHash) => + StateNodeRequest( + ByteString.fromArrayUnsafe(childHash), + None, + requestType, + Seq(parentRequest.nodeHash), + parentRequest.nodeDepth + 1, + 0 + ) + }) + + case n: ExtensionNode => + Right(n.next match { + case HashNode(hash) => + Seq( + StateNodeRequest( + ByteString(hash), + None, + requestType, + Seq(parentRequest.nodeHash), + parentRequest.nodeDepth + n.sharedKey.size, + 0 + ) + ) + case _ => Nil + }) + case _ => Right(Nil) + } + + private def isInDatabase(req: StateNodeRequest): Boolean = { + req.requestType match { + case request: CodeRequest => + blockchain.getEvmCodeByHash(req.nodeHash).isDefined + case request: NodeRequest => + blockchain.getMptNodeByHash(req.nodeHash).isDefined + } + } + + private def isRequestedHashAlreadyCommitted(state: SchedulerState, req: StateNodeRequest): Boolean = { + // TODO [ETCM-103] add bloom filter step before data base to speed things up. Bloomfilter will need to be reloaded after node + // restart. This can be done by exposing RockDb iterator to traverse whole mptnode storage. + // Another possibility is that there is some light way alternative in rocksdb to check key existence + state.memBatch.contains(req.nodeHash) || isInDatabase(req) + } +} + +object SyncStateScheduler { + private val emptyStateRootHash = ByteString(MerklePatriciaTrie.EmptyRootHash) + private val emptyCodeHash = Account.EmptyCodeHash + private val maxMptTrieDepth = 64 + + sealed abstract class RequestType + + sealed abstract class CodeRequest extends RequestType + + case object Code extends CodeRequest + + sealed abstract class NodeRequest extends RequestType + + case object StateNode extends NodeRequest + + case object StorageNode extends NodeRequest + + def apply(blockchain: Blockchain): SyncStateScheduler = { + new SyncStateScheduler(blockchain) + } + + final case class StateNodeRequest( + nodeHash: ByteString, + resolvedData: Option[ByteString], + requestType: RequestType, + parents: Seq[ByteString], + nodeDepth: Int, + dependencies: Int + ) { + def isNodeRequest: Boolean = requestType match { + case _: CodeRequest => false + case _: NodeRequest => true + } + } + + private val stateNodeRequestComparator = new Comparator[StateNodeRequest] { + override def compare(o1: StateNodeRequest, o2: StateNodeRequest): Int = { + o2.nodeDepth compare o1.nodeDepth + } + } + + implicit class Tuple2Ops[A, B](o: io.vavr.Tuple2[A, B]) { + def asScala(): (A, B) = (o._1, o._2) + } + + final case class SyncResponse(hash: ByteString, data: ByteString) + + case class SchedulerState( + activeRequest: Map[ByteString, StateNodeRequest], + queue: PriorityQueue[StateNodeRequest], + memBatch: Map[ByteString, (ByteString, RequestType)] + ) { + + def schedule(request: StateNodeRequest): SchedulerState = { + activeRequest.get(request.nodeHash) match { + case Some(oldRequest) => + copy(activeRequest + (request.nodeHash -> oldRequest.copy(parents = oldRequest.parents ++ request.parents))) + + case None => + copy(activeRequest + (request.nodeHash -> request), queue.enqueue(request)) + } + } + + def getMissingHashes(max: Int): (List[ByteString], SchedulerState) = { + @tailrec + def go( + currentQueue: PriorityQueue[StateNodeRequest], + remaining: Int, + got: List[ByteString] + ): (PriorityQueue[StateNodeRequest], List[ByteString]) = { + if (remaining == 0) { + (currentQueue, got.reverse) + } else if (currentQueue.isEmpty) { + (currentQueue, got.reverse) + } else { + val (elem, newQueue) = currentQueue.dequeue().asScala() + go(newQueue, remaining - 1, elem.nodeHash :: got) + } + } + + val (newQueue, elements) = go(queue, max, List.empty) + (elements, copy(queue = newQueue)) + } + + def getAllMissingHashes: (List[ByteString], SchedulerState) = getMissingHashes(queue.size()) + + def numberOfPendingRequests: Int = activeRequest.size + + def getPendingRequestByHash(hash: ByteString): Option[StateNodeRequest] = activeRequest.get(hash) + + def numberOfMissingHashes: Int = queue.size() + + def commit(request: StateNodeRequest): SchedulerState = { + @tailrec + def go( + currentRequests: Map[ByteString, StateNodeRequest], + currentBatch: Map[ByteString, (ByteString, RequestType)], + parentsToCheck: Seq[ByteString] + ): (Map[ByteString, StateNodeRequest], Map[ByteString, (ByteString, RequestType)]) = { + if (parentsToCheck.isEmpty) { + (currentRequests, currentBatch) + } else { + val parent = parentsToCheck.head + // if the parent is not there, something is terribly wrong and our assumptions do not hold, it is perfectly fine to + // fail with exception + val parentRequest = currentRequests.getOrElse( + parent, + throw new IllegalStateException( + "Critical error. Missing parent " + + s"with hash ${parent}" + ) + ) + val newParentDeps = parentRequest.dependencies - 1 + if (newParentDeps == 0) { + // we can always call `parentRequest.resolvedData.get` on parent node, as to even have children parent data + // needs to be provided + go( + currentRequests - parent, + currentBatch + (parent -> (parentRequest.resolvedData.getOrElse( + throw new IllegalStateException( + s"Critical error. Parent ${parentRequest.nodeHash} without resolved data" + ) + ), parentRequest.requestType)), + parentsToCheck.tail ++ parentRequest.parents + ) + } else { + go( + currentRequests + (parent -> parentRequest.copy(dependencies = newParentDeps)), + currentBatch, + parentsToCheck.tail + ) + } + } + } + + val newActive = activeRequest - request.nodeHash + val newMemBatch = memBatch + (request.nodeHash -> (request.resolvedData.get, request.requestType)) + + val (newRequests, newBatch) = go(newActive, newMemBatch, request.parents) + copy(activeRequest = newRequests, memBatch = newBatch) + } + + def resolveRequest( + request: StateNodeRequest, + receivedData: ByteString, + requestNewChildren: Seq[StateNodeRequest] + ): SchedulerState = { + val numberOfChildren = requestNewChildren.size + val resolvedStateNodeRequest = + request.copy(resolvedData = Some(receivedData), dependencies = request.dependencies + numberOfChildren) + val newRequests = activeRequest + (request.nodeHash -> resolvedStateNodeRequest) + val stateWithUpdatedParent = copy(activeRequest = newRequests) + requestNewChildren.foldLeft(stateWithUpdatedParent) { case (state, child) => state.schedule(child) } + } + + def getNodesToPersist: (Seq[(ByteString, (ByteString, RequestType))], SchedulerState) = { + (memBatch.toSeq, copy(memBatch = Map.empty)) + } + } + + object SchedulerState { + def apply(): SchedulerState = { + new SchedulerState( + Map.empty[ByteString, StateNodeRequest], + PriorityQueue.empty(stateNodeRequestComparator), + Map.empty + ) + } + } + + case object ProcessingSuccess + + sealed trait ResponseProcessingError + + sealed trait CriticalError extends ResponseProcessingError + + case object CannotDecodeMptNode extends CriticalError + + case object NotAccountLeafNode extends CriticalError + + sealed trait NotCriticalError extends ResponseProcessingError + + case object NotRequestedItem extends NotCriticalError + + case object AlreadyProcessedItem extends NotCriticalError + + final case class ProcessingStatistics(duplicatedHashes: Long, notRequestedHashes: Long, saved: Long) { + def addNotRequested(): ProcessingStatistics = copy(notRequestedHashes = notRequestedHashes + 1) + def addDuplicated(): ProcessingStatistics = copy(duplicatedHashes = duplicatedHashes + 1) + def addSaved(newSaved: Long): ProcessingStatistics = copy(saved = saved + newSaved) + def addStats(that: ProcessingStatistics): ProcessingStatistics = + copy( + duplicatedHashes = that.duplicatedHashes, + notRequestedHashes = that.notRequestedHashes, + saved = saved + that.saved + ) + } + + object ProcessingStatistics { + def apply(): ProcessingStatistics = new ProcessingStatistics(0, 0, 0) + } + +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala new file mode 100644 index 0000000000..929aab6d6d --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala @@ -0,0 +1,136 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers} +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.RegisterScheduler +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ProcessingStatistics, SchedulerState, SyncResponse} +import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{ + GetMissingNodes, + MissingNodes, + PrintInfo, + PrintInfoKey, + StartSyncingTo, + StateSyncFinished, + maxMembatchSize +} +import io.iohk.ethereum.utils.ByteStringUtils + +import scala.concurrent.duration._ + +class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler) + extends Actor + with ActorLogging + with Timers { + + def idle: Receive = { case StartSyncingTo(root, bn) => + timers.startTimerAtFixedRate(PrintInfoKey, PrintInfo, 30.seconds) + log.info(s"Starting state sync to root ${ByteStringUtils.hash2string(root)} on block ${bn}") + //TODO handle case when we already have root i.e state is synced up to this point + val initState = sync.initState(root).get + val (initialMissing, state1) = initState.getAllMissingHashes + downloader ! RegisterScheduler + downloader ! GetMissingNodes(initialMissing) + context become (syncing(state1, ProcessingStatistics(), bn, sender())) + } + + private def finalizeSync(state: SchedulerState, targetBlock: BigInt, syncInitiator: ActorRef): Unit = { + if (state.memBatch.nonEmpty) { + log.debug(s"Persisting ${state.memBatch.size} elements to blockchain and finalizing the state sync") + sync.persistBatch(state, targetBlock) + syncInitiator ! StateSyncFinished + context.become(idle) + } else { + log.info(s"Finalizing the state sync") + syncInitiator ! StateSyncFinished + context.become(idle) + } + } + + // scalastyle:off method.length + def syncing( + currentState: SchedulerState, + currentStats: ProcessingStatistics, + targetBlock: BigInt, + syncInitiator: ActorRef + ): Receive = { + case MissingNodes(nodes, downloaderCap) => + log.debug(s"Received {} new nodes to process", nodes.size) + // Current SyncStateDownloaderActor makes sure that there is no not requested or duplicated values in its response. + // so we can ignore those errors. + // TODO make processing async as sometimes downloader sits idle + sync.processResponses(currentState, nodes) match { + case Left(value) => + log.error(s"Critical error while state syncing ${value}, stopping state sync") + // TODO we should probably start sync again from new target block, as current trie is malformed or declare + // fast sync as failure and start normal sync from scratch + context.stop(self) + case Right((newState, statistics)) => + if (newState.numberOfPendingRequests == 0) { + finalizeSync(newState, targetBlock, syncInitiator) + } else { + log.debug( + s" There are {} pending state requests," + + s"Missing queue size is {} elements", + newState.numberOfPendingRequests, + newState.queue.size() + ) + + val (missing, state2) = sync.getMissingNodes(newState, downloaderCap) + + if (missing.nonEmpty) { + log.debug(s"Asking downloader for {} missing state nodes", missing.size) + downloader ! GetMissingNodes(missing) + } + + if (state2.memBatch.size >= maxMembatchSize) { + log.debug("Current membatch size is {}, persisting nodes to database", state2.memBatch.size) + val state3 = sync.persistBatch(state2, targetBlock) + context.become( + syncing( + state3, + currentStats.addStats(statistics).addSaved(state2.memBatch.size), + targetBlock, + syncInitiator + ) + ) + } else { + context.become(syncing(state2, currentStats.addStats(statistics), targetBlock, syncInitiator)) + } + } + } + + case PrintInfo => + val syncStats = s""" Status of mpt state sync: + | Number of Pending requests: ${currentState.numberOfPendingRequests}, + | Number of Missing hashes waiting to be retrieved: ${currentState.queue.size()}, + | Number of Mpt nodes saved to database: ${currentStats.saved}, + | Number of duplicated hashes: ${currentStats.duplicatedHashes}, + | Number of not requested hashes: ${currentStats.notRequestedHashes}, + """.stripMargin + + log.info(syncStats) + } + + override def receive: Receive = idle + +} + +object SyncStateSchedulerActor { + def props(downloader: ActorRef, sync: SyncStateScheduler): Props = { + Props(new SyncStateSchedulerActor(downloader, sync)) + } + + final case object PrintInfo + final case object PrintInfoKey + + case class StartSyncingTo(stateRoot: ByteString, blockNumber: BigInt) + + case class GetMissingNodes(nodesToGet: List[ByteString]) + + case class MissingNodes(missingNodes: List[SyncResponse], downloaderCapacity: Int) + + case object StateSyncFinished + + // TODO determine this number of maybe it should be put to config + val maxMembatchSize = 10000 +} diff --git a/src/main/scala/io/iohk/ethereum/domain/Account.scala b/src/main/scala/io/iohk/ethereum/domain/Account.scala index c3927b6ff7..455cec90d1 100644 --- a/src/main/scala/io/iohk/ethereum/domain/Account.scala +++ b/src/main/scala/io/iohk/ethereum/domain/Account.scala @@ -8,11 +8,14 @@ import io.iohk.ethereum.rlp import io.iohk.ethereum.rlp.RLPImplicits._ import org.bouncycastle.util.encoders.Hex +import scala.util.Try + object Account { val EmptyStorageRootHash = ByteString(kec256(rlp.encode(Array.empty[Byte]))) val EmptyCodeHash: ByteString = kec256(ByteString()) - def empty(startNonce: UInt256 = UInt256.Zero): Account = Account(nonce = startNonce, storageRoot = EmptyStorageRootHash, codeHash = EmptyCodeHash) + def empty(startNonce: UInt256 = UInt256.Zero): Account = + Account(nonce = startNonce, storageRoot = EmptyStorageRootHash, codeHash = EmptyCodeHash) implicit val accountSerializer = new ByteArraySerializable[Account] { @@ -22,13 +25,16 @@ object Account { override def toBytes(input: Account): Array[Byte] = input.toBytes } + + def apply(bytes: ByteString): Try[Account] = Try(accountSerializer.fromBytes(bytes.toArray)) } case class Account( - nonce: UInt256 = 0, - balance: UInt256 = 0, - storageRoot: ByteString = Account.EmptyStorageRootHash, - codeHash: ByteString = Account.EmptyCodeHash) { + nonce: UInt256 = 0, + balance: UInt256 = 0, + storageRoot: ByteString = Account.EmptyStorageRootHash, + codeHash: ByteString = Account.EmptyCodeHash +) { def increaseBalance(value: UInt256): Account = copy(balance = balance + value) diff --git a/src/test/scala/io/iohk/ethereum/ObjectGenerators.scala b/src/test/scala/io/iohk/ethereum/ObjectGenerators.scala index aa3d6bf927..6f039fd9cc 100644 --- a/src/test/scala/io/iohk/ethereum/ObjectGenerators.scala +++ b/src/test/scala/io/iohk/ethereum/ObjectGenerators.scala @@ -4,6 +4,7 @@ import java.math.BigInteger import java.security.SecureRandom import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.StateSyncUtils.MptNodeData import io.iohk.ethereum.crypto.ECDSASignature import io.iohk.ethereum.mpt.HexPrefix.bytesToNibbles import org.scalacheck.{Arbitrary, Gen, Shrink} @@ -11,7 +12,7 @@ import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MptN import io.iohk.ethereum.domain._ import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock - +// scalastyle:off trait ObjectGenerators { def noShrink[T]: Shrink[T] = Shrink[T](_ => Stream.empty) @@ -28,11 +29,13 @@ trait ObjectGenerators { def bigIntGen: Gen[BigInt] = byteArrayOfNItemsGen(32).map(b => new BigInteger(1, b)) - def randomSizeByteArrayGen(minSize: Int, maxSize: Int): Gen[Array[Byte]] = Gen.choose(minSize, maxSize).flatMap(byteArrayOfNItemsGen(_)) + def randomSizeByteArrayGen(minSize: Int, maxSize: Int): Gen[Array[Byte]] = + Gen.choose(minSize, maxSize).flatMap(byteArrayOfNItemsGen(_)) def byteArrayOfNItemsGen(n: Int): Gen[Array[Byte]] = Gen.listOfN(n, Arbitrary.arbitrary[Byte]).map(_.toArray) - def randomSizeByteStringGen(minSize: Int, maxSize: Int): Gen[ByteString] = Gen.choose(minSize, maxSize).flatMap(byteStringOfLengthNGen) + def randomSizeByteStringGen(minSize: Int, maxSize: Int): Gen[ByteString] = + Gen.choose(minSize, maxSize).flatMap(byteStringOfLengthNGen) def byteStringOfLengthNGen(n: Int): Gen[ByteString] = byteArrayOfNItemsGen(n).map(ByteString(_)) @@ -55,7 +58,7 @@ trait ObjectGenerators { for { byteStringList <- Gen.nonEmptyListOf(byteStringOfLengthNGen(size)) arrayList <- Gen.nonEmptyListOf(byteArrayOfNItemsGen(size)) - } yield byteStringList.zip(arrayList) + } yield byteStringList.zip(arrayList) } def receiptGen(): Gen[Receipt] = for { @@ -69,11 +72,13 @@ trait ObjectGenerators { logs = Seq() ) + def addressGen: Gen[Address] = byteArrayOfNItemsGen(20).map(Address(_)) + def transactionGen(): Gen[Transaction] = for { nonce <- bigIntGen gasPrice <- bigIntGen gasLimit <- bigIntGen - receivingAddress <- byteArrayOfNItemsGen(20).map(Address(_)) + receivingAddress <- addressGen value <- bigIntGen payload <- byteStringOfLengthNGen(256) } yield Transaction( @@ -88,7 +93,9 @@ trait ObjectGenerators { def receiptsGen(n: Int): Gen[Seq[Seq[Receipt]]] = Gen.listOfN(n, Gen.listOf(receiptGen())) def branchNodeGen: Gen[BranchNode] = for { - children <- Gen.listOfN(16, byteStringOfLengthNGen(32)).map(childrenList => childrenList.map(child => HashNode(child.toArray[Byte]))) + children <- Gen + .listOfN(16, byteStringOfLengthNGen(32)) + .map(childrenList => childrenList.map(child => HashNode(child.toArray[Byte]))) terminator <- byteStringOfLengthNGen(32) } yield { val branchNode = BranchNode(children.toArray, Some(terminator)) @@ -110,11 +117,11 @@ trait ObjectGenerators { value <- byteStringOfLengthNGen(32) } yield { val leafNode = LeafNode(ByteString(bytesToNibbles(keyNibbles)), value) - val asRlp = MptTraversals.encode(leafNode) + val asRlp = MptTraversals.encode(leafNode) leafNode.copy(parsedRlp = Some(asRlp)) } - def nodeGen: Gen[MptNode] = Gen.choose(0, 2).flatMap{ i => + def nodeGen: Gen[MptNode] = Gen.choose(0, 2).flatMap { i => i match { case 0 => branchNodeGen case 1 => extensionNodeGen @@ -126,8 +133,8 @@ trait ObjectGenerators { val senderKeys = crypto.generateKeyPair(secureRandom) val txsSeqGen = Gen.listOfN(length, transactionGen()) txsSeqGen.map { txs => - txs.map { - tx => SignedTransaction.sign(tx, senderKeys, chainId).tx + txs.map { tx => + SignedTransaction.sign(tx, senderKeys, chainId).tx } } } @@ -199,6 +206,20 @@ trait ObjectGenerators { size <- intGen(min, max) nodes <- Gen.listOfN(size, nodeGen) } yield nodes + + def genMptNodeData: Gen[MptNodeData] = for { + receivingAddress <- addressGen + code <- byteStringOfLengthNGen(10) + storageSize <- intGen(1, 100) + storage <- Gen.listOfN(storageSize, intGen(1, 5000)) + storageAsBigInts = storage.distinct.map(s => (BigInt(s), BigInt(s))) + value <- intGen(0, 2000) + } yield MptNodeData(receivingAddress, Some(code), storageAsBigInts, value) + + def genMultipleNodeData(max: Int): Gen[List[MptNodeData]] = for { + n <- intGen(1, max) + list <- Gen.listOfN(n, genMptNodeData) + } yield list } object ObjectGenerators extends ObjectGenerators diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala index 9e53859ba9..010d0dc64f 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala @@ -5,14 +5,20 @@ import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} import io.iohk.ethereum.ledger.Ledger.VMImpl import io.iohk.ethereum.nodebuilder.PruningConfigBuilder - trait EphemBlockchainTestSetup extends ScenarioSetup { + sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { override lazy val pruningMode: PruningMode = ArchivePruning } //+ cake overrides override lazy val vm: VMImpl = new VMImpl - override lazy val storagesInstance = new EphemDataSourceComponent with LocalPruningConfigBuilder with Storages.DefaultStorages + override lazy val storagesInstance = new EphemDataSourceComponent + with LocalPruningConfigBuilder + with Storages.DefaultStorages //- cake overrides + + def getNewStorages: EphemDataSourceComponent with LocalPruningConfigBuilder with Storages.DefaultStorages = { + new EphemDataSourceComponent with LocalPruningConfigBuilder with Storages.DefaultStorages + } } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala new file mode 100644 index 0000000000..ba1cfe91e9 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala @@ -0,0 +1,184 @@ +package io.iohk.ethereum.blockchain.sync + +import java.net.InetSocketAddress + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.TestActor.AutoPilot +import akka.testkit.{TestKit, TestProbe} +import io.iohk.ethereum.blockchain.sync.StateSyncUtils.TrieProvider +import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{StartSyncingTo, StateSyncFinished} +import io.iohk.ethereum.domain.BlockchainImpl +import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo, SendMessage} +import io.iohk.ethereum.network.{Peer, PeerId} +import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer +import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status +import io.iohk.ethereum.network.p2p.messages.PV63.GetNodeData.GetNodeDataEnc +import io.iohk.ethereum.network.p2p.messages.PV63.NodeData +import io.iohk.ethereum.network.p2p.messages.Versions +import io.iohk.ethereum.utils.Config +import io.iohk.ethereum.{Fixtures, ObjectGenerators, WithActorSystemShutDown} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks + +import scala.concurrent.duration._ +import scala.util.Random + +class StateSyncSpec + extends TestKit(ActorSystem("MySpec")) + with AnyFlatSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaCheckPropertyChecks + with WithActorSystemShutDown { + + val actorSystem = system + + "StateSync" should "sync state to different tries" in new TestSetup() { + forAll(ObjectGenerators.genMultipleNodeData(3000)) { nodeData => + val initiator = TestProbe() + val trieProvider = TrieProvider() + val target = trieProvider.buildWorld(nodeData) + setAutoPilotWithProvider(trieProvider) + initiator.send(scheduler, StartSyncingTo(target, 1)) + initiator.expectMsg(20.seconds, StateSyncFinished) + } + } + + it should "sync state to different tries when peers provide different set of data each time" in new TestSetup() { + forAll(ObjectGenerators.genMultipleNodeData(1000)) { nodeData => + val initiator = TestProbe() + val trieProvider1 = TrieProvider() + val target = trieProvider1.buildWorld(nodeData) + setAutoPilotWithProvider(trieProvider1, partialResponseConfig) + initiator.send(scheduler, StartSyncingTo(target, 1)) + initiator.expectMsg(20.seconds, StateSyncFinished) + } + } + + it should "sync state to different tries when peer provide mixed responses" in new TestSetup() { + forAll(ObjectGenerators.genMultipleNodeData(1000)) { nodeData => + val initiator = TestProbe() + val trieProvider1 = TrieProvider() + val target = trieProvider1.buildWorld(nodeData) + setAutoPilotWithProvider(trieProvider1, mixedResponseConfig) + initiator.send(scheduler, StartSyncingTo(target, 1)) + initiator.expectMsg(20.seconds, StateSyncFinished) + } + } + + class TestSetup extends EphemBlockchainTestSetup with TestSyncConfig { + override implicit lazy val system = actorSystem + type PeerConfig = Map[PeerId, PeerAction] + val syncInit = TestProbe() + + val peerStatus = Status( + protocolVersion = Versions.PV63, + networkId = 1, + totalDifficulty = BigInt(10000), + bestHash = Fixtures.Blocks.Block3125369.header.hash, + genesisHash = Fixtures.Blocks.Genesis.header.hash + ) + val initialPeerInfo = PeerInfo( + remoteStatus = peerStatus, + totalDifficulty = peerStatus.totalDifficulty, + forkAccepted = false, + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash + ) + + val trieProvider = new TrieProvider(blockchain, blockchainConfig) + + val peersMap = (1 to 8).map { i => + ( + Peer(new InetSocketAddress("127.0.0.1", i), TestProbe(i.toString).ref, incomingConnection = false), + initialPeerInfo + ) + }.toMap + + sealed trait PeerAction + + case object FullResponse extends PeerAction + + case object PartialResponse extends PeerAction + + case object NoResponse extends PeerAction + + val defaultPeerConfig: PeerConfig = peersMap.map { case (peer, _) => + peer.id -> FullResponse + } + + val maxMptNodeRequest = 50 + + val partialResponseConfig: PeerConfig = peersMap.map { case (peer, _) => + peer.id -> PartialResponse + } + + val mixedResponseConfig: PeerConfig = peersMap.map { case (peer, _) => + if (peer.remoteAddress.getPort <= 3) { + peer.id -> FullResponse + } else if (peer.remoteAddress.getPort > 3 && peer.remoteAddress.getPort <= 6) { + peer.id -> PartialResponse + } else { + peer.id -> NoResponse + } + } + + val etcPeerManager = TestProbe() + + val peerEventBus = TestProbe() + + def setAutoPilotWithProvider(trieProvider: TrieProvider, peerConfig: PeerConfig = defaultPeerConfig): Unit = { + etcPeerManager.setAutoPilot(new AutoPilot { + override def run(sender: ActorRef, msg: Any): AutoPilot = { + msg match { + case SendMessage(msg: GetNodeDataEnc, peer) => + peerConfig(peer) match { + case FullResponse => + val responseMsg = + NodeData(trieProvider.getNodes(msg.underlyingMsg.mptElementsHashes.toList).map(_.data)) + sender ! MessageFromPeer(responseMsg, peer) + this + case PartialResponse => + val elementsToServe = Random.nextInt(maxMptNodeRequest) + val toGet = msg.underlyingMsg.mptElementsHashes.toList.take(elementsToServe) + val responseMsg = NodeData(trieProvider.getNodes(toGet).map(_.data)) + sender ! MessageFromPeer(responseMsg, peer) + this + case NoResponse => + this + } + + case GetHandshakedPeers => + sender ! HandshakedPeers(peersMap) + this + } + } + }) + } + + override lazy val syncConfig: Config.SyncConfig = defaultSyncConfig.copy( + peersScanInterval = 0.5.second, + nodesPerRequest = maxMptNodeRequest, + blacklistDuration = 1.second, + peerResponseTimeout = 1.second, + syncRetryInterval = 50.milliseconds + ) + + lazy val downloader = + system.actorOf(SyncStateDownloaderActor.props(etcPeerManager.ref, peerEventBus.ref, syncConfig, system.scheduler)) + + def buildBlockChain() = { + BlockchainImpl(getNewStorages.storages) + } + + lazy val scheduler = system.actorOf( + SyncStateSchedulerActor.props( + downloader, + new SyncStateScheduler(buildBlockChain()) + ) + ) + } + +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncUtils.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncUtils.scala new file mode 100644 index 0000000000..8c95156041 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncUtils.scala @@ -0,0 +1,107 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.SyncResponse +import io.iohk.ethereum.domain.{Account, Address, Blockchain, BlockchainImpl} +import io.iohk.ethereum.ledger.InMemoryWorldStateProxy +import io.iohk.ethereum.utils.{BlockchainConfig, ByteUtils} + +object StateSyncUtils extends EphemBlockchainTestSetup { + + final case class MptNodeData( + accountAddress: Address, + accountCode: Option[ByteString], + accountStorage: Seq[(BigInt, BigInt)], + accountBalance: Int + ) + + class TrieProvider(bl: Blockchain, blockchainConfig: BlockchainConfig) { + def getNodes(hashes: List[ByteString]) = { + hashes.map { hash => + val maybeResult = bl.getMptNodeByHash(hash) match { + case Some(value) => Some(ByteString(value.encode)) + case None => bl.getEvmCodeByHash(hash) + } + maybeResult match { + case Some(result) => SyncResponse(hash, result) + case None => throw new RuntimeException("Missing expected data in storage") + } + } + } + + def buildWorld(accountData: Seq[MptNodeData]): ByteString = { + val init: InMemoryWorldStateProxy = bl + .getWorldStateProxy( + blockNumber = 1, + accountStartNonce = blockchainConfig.accountStartNonce, + stateRootHash = None, + noEmptyAccounts = true, + ethCompatibleStorage = blockchainConfig.ethCompatibleStorage + ) + .asInstanceOf[InMemoryWorldStateProxy] + + val modifiedWorld = accountData.foldLeft(init) { case (world, data) => + val storage = world.getStorage(data.accountAddress) + val modifiedStorage = data.accountStorage.foldLeft(storage) { case (s, v) => + s.store(v._1, v._2) + } + val code = world.getCode(data.accountAddress) + val worldWithAccAndStorage = world + .saveAccount(data.accountAddress, Account.empty().copy(balance = data.accountBalance)) + .saveStorage(data.accountAddress, modifiedStorage) + + val finalWorld = + if (data.accountCode.isDefined) + worldWithAccAndStorage.saveCode(data.accountAddress, data.accountCode.get) + else + worldWithAccAndStorage + finalWorld + } + + val persisted = InMemoryWorldStateProxy.persistState(modifiedWorld) + persisted.stateRootHash + } + } + + object TrieProvider { + def apply(): TrieProvider = { + val freshStorage = getNewStorages + new TrieProvider(BlockchainImpl(freshStorage.storages), blockchainConfig) + } + } + + def createNodeDataStartingFrom(initialNumber: Int, lastNumber: Int, storageOffset: Int): Seq[MptNodeData] = { + (initialNumber until lastNumber).map { i => + val address = Address(i) + val codeBytes = ByteString(BigInt(i).toByteArray) + val storage = (initialNumber until initialNumber + storageOffset).map(s => (BigInt(s), BigInt(s))) + val balance = i + MptNodeData(address, Some(codeBytes), storage, balance) + } + } + + def checkAllDataExists(nodeData: List[MptNodeData], bl: Blockchain, blNumber: BigInt): Boolean = { + def go(remaining: List[MptNodeData]): Boolean = { + if (remaining.isEmpty) { + true + } else { + val dataToCheck = remaining.head + val address = bl.getAccount(dataToCheck.accountAddress, blNumber) + val code = address.flatMap(a => bl.getEvmCodeByHash(a.codeHash)) + + val storageCorrect = dataToCheck.accountStorage.forall { case (key, value) => + val stored = bl.getAccountStorageAt(address.get.storageRoot, key, ethCompatibleStorage = true) + ByteUtils.toBigInt(stored) == value + } + + if (address.isDefined && code.isDefined && storageCorrect) { + go(remaining.tail) + } else { + false + } + } + } + + go(nodeData) + } +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala index 1b5721ecac..a6ef44cad6 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala @@ -1,16 +1,18 @@ package io.iohk.ethereum.blockchain.sync +import java.net.InetSocketAddress + import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestActor.AutoPilot import akka.testkit.{TestActorRef, TestProbe} import akka.util.ByteString -import io.iohk.ethereum.blockchain.sync.FastSync.{StateMptNodeHash, SyncState} +import io.iohk.ethereum.blockchain.sync.FastSync.SyncState import io.iohk.ethereum.consensus.TestConsensus import io.iohk.ethereum.consensus.validators.BlockHeaderError.{HeaderParentNotFoundError, HeaderPoWError} import io.iohk.ethereum.consensus.validators.{BlockHeaderValid, BlockHeaderValidator, Validators} -import io.iohk.ethereum.domain.{Account, BlockBody, BlockHeader, Receipt} -import io.iohk.ethereum.ledger.Ledger +import io.iohk.ethereum.domain.{Account, BlockHeader, BlockBody, Receipt} import io.iohk.ethereum.ledger.Ledger.VMImpl +import io.iohk.ethereum.ledger.Ledger import io.iohk.ethereum.network.EtcPeerManagerActor.{HandshakedPeers, PeerInfo} import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.{MessageClassifier, PeerDisconnectedClassifier} @@ -22,14 +24,14 @@ import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, GetReceipts, Nod import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer} import io.iohk.ethereum.utils.Config.SyncConfig import io.iohk.ethereum.{Fixtures, Mocks} -import java.net.InetSocketAddress -import org.bouncycastle.util.encoders.Hex import org.scalamock.scalatest.MockFactory import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers +import org.bouncycastle.util.encoders.Hex + import scala.concurrent.Await import scala.concurrent.duration._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers // scalastyle:off file.size.limit class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with MockFactory { @@ -44,7 +46,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w Await.result(system.terminate(), 1.seconds) } - "SyncController" should "download pivot block and request block headers" in new TestSetup() { + "SyncController" should "download pivot block and request blockheaders" in new TestSetup() { syncController ! SyncController.Start Thread.sleep(startDelayMillis) @@ -111,20 +113,19 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendBlockHeaders(firstNewBlock, newBlocks, peer1, newBlocks.size) - Thread.sleep(syncConfig.fastSyncThrottle.toMillis) - sendNewPivotBlock( - defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number + 1), - peer1, - peer1Status, - handshakedPeers - ) - Thread.sleep(1.second.toMillis) sendReceipts(newBlocks.map(_.hash), newReceipts, peer1) Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1) + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendNewPivotBlock( + defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number), + peer1, + peer1Status, + handshakedPeers + ) Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendNodes(Seq(defaultpivotBlockHeader.stateRoot), Seq(defaultStateMptLeafWithAccount), peer1) @@ -178,14 +179,6 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendBlockHeaders(firstNewBlock, newBlocks, peer1, newBlocks.size) - Thread.sleep(syncConfig.fastSyncThrottle.toMillis) - sendNewPivotBlock( - defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number + 1), - peer1, - peer1Status, - handshakedPeers - ) - Thread.sleep(1.second.toMillis) sendReceipts(newBlocks.map(_.hash), Seq(), peer1) @@ -196,6 +189,15 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1) + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendNewPivotBlock( + defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number), + peer1, + peer1Status, + handshakedPeers, + "$d" + ) + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendNodes(Seq(defaultpivotBlockHeader.stateRoot), Seq(defaultStateMptLeafWithAccount), peer1) @@ -316,7 +318,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w syncState.receiptsQueue.isEmpty shouldBe true } - it should "update target block if target fail" in new TestSetup(_validators = new Mocks.MockValidatorsAlwaysSucceed { + it should "update pivot block if pivot fail" in new TestSetup(_validators = new Mocks.MockValidatorsAlwaysSucceed { override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, getBlockHeaderByHash) => { if (blockHeader.number != 399500 + 10) { @@ -362,7 +364,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w newBestBlockHeader, peer1, peer1Status, - handshakedPeers + handshakedPeers, + "$a" ) persistState() @@ -378,7 +381,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w syncState.pivotBlockUpdateFailures shouldEqual 1 } - it should "not process, out of date new target block" in new TestSetup() { + it should "not process, out of date new pivot block" in new TestSetup() { val newSafeTarget = defaultExpectedPivotBlock + syncConfig.fastSyncBlockValidationX val bestBlockNumber = defaultExpectedPivotBlock @@ -406,16 +409,24 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peer1, syncConfig.blockHeadersPerRequest ) + val newReceipts = newBlocks.map(_.hash).map(_ => Seq.empty[Receipt]) + val newBodies = newBlocks.map(_ => BlockBody.empty) Thread.sleep(1.second.toMillis) + sendReceipts(newBlocks.map(_.hash), newReceipts, peer1) + + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1) - val newTarget = defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number - 1) + val newPivot = defaultpivotBlockHeader.copy(number = defaultpivotBlockHeader.number - 1) + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) sendNewPivotBlock( - newTarget, + newPivot, peer1, peer1Status, - handshakedPeers + handshakedPeers, + "$c" ) persistState() @@ -426,26 +437,26 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w Thread.sleep(syncConfig.syncRetryInterval.toMillis) - val goodTarget = newTarget.copy(number = newTarget.number + syncConfig.blockHeadersPerRequest) + val goodPivot = newPivot.copy(number = newPivot.number + syncConfig.blockHeadersPerRequest) sendNewPivotBlock( - goodTarget, + goodPivot, peer1, peer1Status, handshakedPeers, - "$b" + "$d" ) persistState() val newSyncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get - newSyncState.safeDownloadTarget shouldEqual goodTarget.number + syncConfig.fastSyncBlockValidationX - newSyncState.pivotBlock shouldEqual goodTarget + newSyncState.safeDownloadTarget shouldEqual goodPivot.number + syncConfig.fastSyncBlockValidationX + newSyncState.pivotBlock shouldEqual goodPivot newSyncState.bestBlockHeaderNumber shouldEqual bestBlockNumber + syncConfig.blockHeadersPerRequest newSyncState.pivotBlockUpdateFailures shouldEqual 1 } - it should "should start state download only when target block is fresh enough" in new TestSetup() { + it should "should start state download only when pivot block is fresh enough" in new TestSetup() { val newSafeTarget = defaultExpectedPivotBlock + syncConfig.fastSyncBlockValidationX val bestBlockNumber = defaultExpectedPivotBlock @@ -470,15 +481,25 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w syncConfig.blockHeadersPerRequest ) + val newReceipts = newBlocks.map(_.hash).map(_ => Seq.empty[Receipt]) + val newBodies = newBlocks.map(_ => BlockBody.empty) + Thread.sleep(1.second.toMillis) + sendReceipts(newBlocks.map(_.hash), newReceipts, peer1) - val newTarget = defaultpivotBlockHeader.copy(number = defaultExpectedPivotBlock + syncConfig.maxTargetDifference) + Thread.sleep(syncConfig.fastSyncThrottle.toMillis) + sendBlockBodies(newBlocks.map(_.hash), newBodies, peer1) + + Thread.sleep(1.second.toMillis) + + val newPivot = defaultpivotBlockHeader.copy(number = defaultExpectedPivotBlock + syncConfig.maxTargetDifference) sendNewPivotBlock( - newTarget, + newPivot, peer1, peer1Status, - handshakedPeers + handshakedPeers, + "$c" ) persistState() @@ -511,14 +532,18 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w startWithState( defaultState.copy( - bestBlockHeaderNumber = defaultExpectedPivotBlock, - pendingMptNodes = Seq(StateMptNodeHash(defaultpivotBlockHeader.stateRoot)) + bestBlockHeaderNumber = defaultExpectedPivotBlock ) ) syncController ! SyncController.Start - updateHandshakedPeers(HandshakedPeers(singlePeer)) + Thread.sleep(1000) + + etcPeerManager.send( + syncController.getSingleChild("fast-sync").getChild(Seq("state-downloader").toIterator), + HandshakedPeers(singlePeer) + ) etcPeerManager.expectMsg( EtcPeerManagerActor.SendMessage(GetNodeData(Seq(defaultpivotBlockHeader.stateRoot)), peer1.id) @@ -526,9 +551,6 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer1.id)))) peerMessageBus.expectMsg(Unsubscribe()) - // response timeout - Thread.sleep(1.seconds.toMillis) - etcPeerManager.expectNoMessage(1.second) // wait for blacklist timeout @@ -584,10 +606,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w it should "start fast sync after restart, if fast sync was partially ran and then regular sync started" in new TestWithRegularSyncOnSetup with MockFactory { //Save previous incomplete attempt to fast sync - val syncState = SyncState( - pivotBlock = Fixtures.Blocks.Block3125369.header, - pendingMptNodes = Seq(StateMptNodeHash(ByteString("node_hash"))) - ) + val syncState = SyncState(pivotBlock = Fixtures.Blocks.Block3125369.header) storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState) //Attempt to start regular sync @@ -595,11 +614,15 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w override lazy val syncConfig = defaultSyncConfig.copy(doFastSync = false) syncControllerWithRegularSync ! SyncController.Start - - syncControllerWithRegularSync.getSingleChild("fast-sync") ! HandshakedPeers(singlePeer) + Thread.sleep(1000) + syncControllerWithRegularSync.getSingleChild("fast-sync").getChild(Iterator("state-downloader")) ! HandshakedPeers( + singlePeer + ) //Fast sync node request should be received - etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(GetNodeData(Seq(ByteString("node_hash"))), peer1.id)) + etcPeerManager.expectMsg( + EtcPeerManagerActor.SendMessage(GetNodeData(Seq(syncState.pivotBlock.stateRoot)), peer1.id) + ) } class TestSetup( @@ -652,7 +675,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peersScanInterval = 500.milliseconds, redownloadMissingStateNodes = false, fastSyncBlockValidationX = 10, - blacklistDuration = 1.second + blacklistDuration = 1.second, + peerResponseTimeout = 2.seconds ) lazy val syncController = TestActorRef( @@ -726,12 +750,11 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w peer: Peer, peerStatus: Status, handshakedPeers: HandshakedPeers, - actorName: String = "$a" + actorName: String = "$c" ): Unit = { val pivotBlockSelector = syncController.getSingleChild("fast-sync").getChild(Seq(actorName).toIterator) - val updatingPeer: Map[Peer, PeerInfo] = handshakedPeers.peers .mapValues(pi => pi.copy(maxBlockNumber = pivotBlockHeader.number + syncConfig.pivotBlockOffset)) .take(1) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerSpec.scala new file mode 100644 index 0000000000..adbb4d975c --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerSpec.scala @@ -0,0 +1,248 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.util.ByteString +import io.iohk.ethereum.Fixtures +import io.iohk.ethereum.blockchain.sync.StateSyncUtils.{MptNodeData, TrieProvider, checkAllDataExists} +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ + AlreadyProcessedItem, + CannotDecodeMptNode, + NotRequestedItem, + SchedulerState, + SyncResponse +} +import io.iohk.ethereum.db.components.{EphemDataSourceComponent, Storages} +import io.iohk.ethereum.domain.{Address, BlockchainImpl} +import io.iohk.ethereum.vm.Generators.genMultipleNodeData +import org.scalatest.EitherValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks + +class SyncSchedulerSpec extends AnyFlatSpec with Matchers with EitherValues with ScalaCheckPropertyChecks { + "SyncScheduler" should "sync with mptTrie with one account (1 leaf node)" in new TestSetup { + val prov = getTrieProvider + val worldHash = prov.buildWorld(Seq(MptNodeData(Address(1), None, Seq(), 20))) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initialState = scheduler.initState(worldHash).get + val (missingNodes, newState) = scheduler.getMissingNodes(initialState, 1) + val responses = prov.getNodes(missingNodes) + val result = scheduler.processResponses(newState, responses) + val (newRequests, state) = scheduler.getMissingNodes(result.right.value._1, 1) + scheduler.persistBatch(state, 1) + + assert(missingNodes.size == 1) + assert(responses.size == 1) + assert(result.isRight) + assert(newRequests.isEmpty) + assert(state.numberOfPendingRequests == 0) + assert(schedulerDb.storages.nodeStorage.get(missingNodes.head).isDefined) + } + + it should "sync with mptTrie with one account with code and storage" in new TestSetup { + val prov = getTrieProvider + val worldHash = prov.buildWorld( + Seq(MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20)) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + val state1 = exchangeSingleNode(initState, scheduler, prov).right.value + val state2 = exchangeSingleNode(state1, scheduler, prov).right.value + val state3 = exchangeSingleNode(state2, scheduler, prov).right.value + scheduler.persistBatch(state3, 1) + + assert(state1.numberOfPendingRequests > 0) + assert(state2.numberOfPendingRequests > 0) + // only after processing third result request is finalized as code and storage of account has been retrieved + assert(state3.numberOfPendingRequests == 0) + // 1 leaf node + 1 code + 1 storage + assert(schedulerDb.dataSource.storage.size == 3) + } + + it should "sync with mptTrie with 2 accounts with different code and storage" in new TestSetup { + val prov = getTrieProvider + // root is branch with 2 leaf nodes + val worldHash = prov.buildWorld( + Seq( + MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20), + MptNodeData(Address(2), Some(ByteString(1, 2, 3, 4)), Seq((2, 2)), 20) + ) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + assert(schedulerDb.dataSource.storage.isEmpty) + val state1 = exchangeSingleNode(initState, scheduler, prov).right.value + val state2 = exchangeSingleNode(state1, scheduler, prov).right.value + val state3 = exchangeSingleNode(state2, scheduler, prov).right.value + val state4 = exchangeSingleNode(state3, scheduler, prov).right.value + val state5 = scheduler.persistBatch(state4, 1) + // finalized leaf node i.e state node + storage node + code + assert(schedulerDb.dataSource.storage.size == 3) + val state6 = exchangeSingleNode(state5, scheduler, prov).right.value + val state7 = exchangeSingleNode(state6, scheduler, prov).right.value + val state8 = exchangeSingleNode(state7, scheduler, prov).right.value + val state9 = scheduler.persistBatch(state8, 1) + + // 1 non finalized request for branch node + 2 non finalized request for leaf nodes + assert(state1.numberOfPendingRequests == 3) + + // 1 non finalized request for branch node + 2 non finalized requests for leaf nodes + 2 non finalized requests for code and + // storage + assert(state2.numberOfPendingRequests == 5) + + // 1 non finalized request for branch node + 1 non finalized request for leaf node + assert(state5.numberOfPendingRequests == 2) + + // 1 non finalized request for branch node + 1 non finalized request for leaf node + 2 non finalized request for code and storage + assert(state6.numberOfPendingRequests == 4) + + // received code and storage finalized remaining leaf node, and branch node + assert(state8.numberOfPendingRequests == 0) + // 1 branch node + 2 leaf nodes + 4 code and storage data + assert(state9.numberOfPendingRequests == 0) + assert(schedulerDb.dataSource.storage.size == 7) + } + + it should "should not request already known code or storage" in new TestSetup { + val prov = getTrieProvider + // root is branch with 2 leaf nodes, two different account with same code and same storage + val worldHash = prov.buildWorld( + Seq( + MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20), + MptNodeData(Address(2), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20) + ) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + val state1 = exchangeSingleNode(initState, scheduler, prov).right.value + val (allMissingNodes1, state2) = scheduler.getAllMissingNodes(state1) + val allMissingNodes1Response = prov.getNodes(allMissingNodes1) + val state3 = scheduler.processResponses(state2, allMissingNodes1Response).right.value._1 + val (allMissingNodes2, state4) = scheduler.getAllMissingNodes(state3) + val allMissingNodes2Response = prov.getNodes(allMissingNodes2) + val state5 = scheduler.processResponses(state4, allMissingNodes2Response).right.value._1 + val remaingNodes = state5.numberOfPendingRequests + val state6 = scheduler.persistBatch(state5, 1) + + // 1 non finalized request for branch node + 2 non finalized request for leaf nodes + assert(state1.numberOfPendingRequests == 3) + assert(allMissingNodes1.size == 2) + + assert(allMissingNodes2.size == 2) + + assert(remaingNodes == 0) + // 1 branch node + 2 leaf node + 1 code + 1 storage (code and storage are shared by 2 leaf nodes) + assert(schedulerDb.dataSource.storage.size == 5) + } + + it should "should return error when processing unrequested response" in new TestSetup { + val prov = getTrieProvider + // root is branch with 2 leaf nodes, two different account with same code and same storage + val worldHash = prov.buildWorld( + Seq( + MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20), + MptNodeData(Address(2), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20) + ) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + val (firstMissing, state1) = scheduler.getMissingNodes(initState, 1) + val result1 = scheduler.processResponse(state1, SyncResponse(ByteString(1), ByteString(2))) + assert(result1.isLeft) + assert(result1.left.value == NotRequestedItem) + } + + it should "should return error when processing already processed response" in new TestSetup { + val prov = getTrieProvider + // root is branch with 2 leaf nodes, two different account with same code and same storage + val worldHash = prov.buildWorld( + Seq( + MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20), + MptNodeData(Address(2), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20) + ) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + val (firstMissing, state1) = scheduler.getMissingNodes(initState, 1) + val firstMissingResponse = prov.getNodes(firstMissing) + val result1 = scheduler.processResponse(state1, firstMissingResponse.head) + val stateAfterReceived = result1.right.value + val result2 = scheduler.processResponse(stateAfterReceived, firstMissingResponse.head) + + assert(result1.isRight) + assert(result2.isLeft) + assert(result2.left.value == AlreadyProcessedItem) + } + + it should "should return critical error when node is malformed" in new TestSetup { + val prov = getTrieProvider + // root is branch with 2 leaf nodes, two different account with same code and same storage + val worldHash = prov.buildWorld( + Seq( + MptNodeData(Address(1), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20), + MptNodeData(Address(2), Some(ByteString(1, 2, 3)), Seq((1, 1)), 20) + ) + ) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val initState = scheduler.initState(worldHash).get + val (firstMissing, state1) = scheduler.getMissingNodes(initState, 1) + val firstMissingResponse = prov.getNodes(firstMissing) + val result1 = scheduler.processResponse(state1, firstMissingResponse.head.copy(data = ByteString(1, 2, 3))) + assert(result1.isLeft) + assert(result1.left.value == CannotDecodeMptNode) + } + + // Long running test generating random mpt tries and checking that scheduler is able to correctly + // traverse them + it should "sync whole trie when receiving all nodes from remote side" in new TestSetup { + forAll(genMultipleNodeData(5000)) { nodeData => + val prov = getTrieProvider + val worldHash = prov.buildWorld(nodeData) + val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() + val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1) + schedulerBlockchain.storeBlockHeader(header).commit() + var state = scheduler.initState(worldHash).get + while (state.activeRequest.nonEmpty) { + val (allMissingNodes1, state2) = scheduler.getAllMissingNodes(state) + val allMissingNodes1Response = prov.getNodes(allMissingNodes1) + val state3 = scheduler.processResponses(state2, allMissingNodes1Response).right.value._1 + state = state3 + } + assert(state.memBatch.nonEmpty) + val finalState = scheduler.persistBatch(state, 1) + assert(finalState.memBatch.isEmpty) + assert(finalState.activeRequest.isEmpty) + assert(finalState.queue.isEmpty) + assert(checkAllDataExists(nodeData, schedulerBlockchain, 1)) + } + } + + trait TestSetup extends EphemBlockchainTestSetup { + def getTrieProvider: TrieProvider = { + val freshStorage = getNewStorages + val freshBlockchain = BlockchainImpl(freshStorage.storages) + new TrieProvider(freshBlockchain, blockchainConfig) + } + + def buildScheduler(): ( + SyncStateScheduler, + BlockchainImpl, + EphemDataSourceComponent with LocalPruningConfigBuilder with Storages.DefaultStorages + ) = { + val freshStorage = getNewStorages + val freshBlockchain = BlockchainImpl(freshStorage.storages) + (SyncStateScheduler(freshBlockchain), freshBlockchain, freshStorage) + } + + def exchangeSingleNode( + initState: SchedulerState, + scheduler: SyncStateScheduler, + provider: TrieProvider + ): Either[SyncStateScheduler.ResponseProcessingError, SchedulerState] = { + val (missingNodes, newState) = scheduler.getMissingNodes(initState, 1) + val providedResponse = provider.getNodes(missingNodes) + scheduler.processResponses(newState, providedResponse).map(_._1) + } + + } + +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerStateSpec.scala new file mode 100644 index 0000000000..cc8fa08d46 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncSchedulerStateSpec.scala @@ -0,0 +1,42 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{SchedulerState, StateNode, StateNodeRequest} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers + +class SyncSchedulerStateSpec extends AnyFlatSpec with Matchers { + "SyncSchedulerState" should "schedule node hashes for retrieval" in new TestSetup { + val stateWithRequest = schedulerState.schedule(request1) + assert(stateWithRequest != schedulerState) + assert(stateWithRequest.getPendingRequestByHash(request1.nodeHash).contains(request1)) + } + + it should "return enqueued elements in depth order" in new TestSetup { + val stateWithRequests = schedulerState.schedule(request2).schedule(request3).schedule(request1).schedule(request4) + assert(stateWithRequests != schedulerState) + val (allMissingElements, newState) = stateWithRequests.getAllMissingHashes + assert(allMissingElements == reqestsInDepthOrder.map(_.nodeHash)) + val (allMissingElements1, newState1) = newState.getAllMissingHashes + assert(allMissingElements1.isEmpty) + } + + it should "return at most n enqueued elements in depth order" in new TestSetup { + val stateWithRequests = schedulerState.schedule(request2).schedule(request3).schedule(request1).schedule(request4) + assert(stateWithRequests != schedulerState) + val (twoMissingElements, newState) = stateWithRequests.getMissingHashes(2) + assert(twoMissingElements == reqestsInDepthOrder.take(2).map(_.nodeHash)) + val (allMissingElements1, newState1) = newState.getAllMissingHashes + assert(allMissingElements1.size == 2) + } + + trait TestSetup extends EphemBlockchainTestSetup { + val schedulerState = SchedulerState() + val request1 = StateNodeRequest(ByteString(1), None, StateNode, Seq(), 1, 0) + val request2 = StateNodeRequest(ByteString(2), None, StateNode, Seq(), 2, 0) + val request3 = StateNodeRequest(ByteString(3), None, StateNode, Seq(), 3, 0) + val request4 = StateNodeRequest(ByteString(4), None, StateNode, Seq(), 4, 0) + + val reqestsInDepthOrder = List(request4, request3, request2, request1) + } +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderStateSpec.scala new file mode 100644 index 0000000000..511e20cda7 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateDownloaderStateSpec.scala @@ -0,0 +1,250 @@ +package io.iohk.ethereum.blockchain.sync + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.testkit.{TestKit, TestProbe} +import akka.util.ByteString +import cats.data.NonEmptyList +import io.iohk.ethereum.WithActorSystemShutDown +import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.{ + DownloaderState, + NoUsefulDataInResponse, + ResponseProcessingResult, + UnrequestedResponse, + UsefulData +} +import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.SyncResponse +import io.iohk.ethereum.crypto.kec256 +import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.p2p.messages.PV63.NodeData +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.must.Matchers + +class SyncStateDownloaderStateSpec + extends TestKit(ActorSystem("SyncStateDownloaderStateSpec_System")) + with AnyFlatSpecLike + with Matchers + with BeforeAndAfterAll + with WithActorSystemShutDown { + + "DownloaderState" should "schedule requests for retrieval" in new TestSetup { + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + assert(newState.nodesToGet.size == potentialNodesHashes.size) + assert(newState.nonDownloadedNodes.size == potentialNodesHashes.size) + assert(potentialNodesHashes.forall(h => newState.nodesToGet.contains(h))) + } + + it should "assign request to peers from already scheduled nodes to a max capacity" in new TestSetup { + val perPeerCapacity = 20 + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + val (requests, newState1) = newState.assignTasksToPeers(peers, None, nodesPerPeerCapacity = perPeerCapacity) + assert(requests.size == 3) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + assert(newState1.activeRequests.size == 3) + assert(newState1.nonDownloadedNodes.size == potentialNodesHashes.size - (peers.size * perPeerCapacity)) + assert( + requests.forall(request => request.nodes.forall(hash => newState1.nodesToGet(hash).contains(request.peer.id))) + ) + } + + it should "favour already existing requests when assigning tasks with new requests" in new TestSetup { + val perPeerCapacity = 20 + val (alreadyExistingTasks, newTasks) = potentialNodesHashes.splitAt(2 * perPeerCapacity) + val newState = initialState.scheduleNewNodesForRetrieval(alreadyExistingTasks) + val (requests, newState1) = + newState.assignTasksToPeers(peers, Some(newTasks), nodesPerPeerCapacity = perPeerCapacity) + assert(requests.size == 3) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + // all already existing task should endup in delivery + assert(alreadyExistingTasks.forall(hash => newState1.nodesToGet(hash).isDefined)) + // check that first 20 nodes from new nodes has been schedued for delivery and next 40 is waiting for available peer + assert(newTasks.take(perPeerCapacity).forall(hash => newState1.nodesToGet(hash).isDefined)) + assert(newTasks.drop(perPeerCapacity).forall(hash => newState1.nodesToGet(hash).isEmpty)) + + // standard check that active requests are in line with nodes in delivery + assert(newState1.activeRequests.size == 3) + assert(newState1.nonDownloadedNodes.size == potentialNodesHashes.size - (peers.size * perPeerCapacity)) + assert( + requests.forall(request => request.nodes.forall(hash => newState1.nodesToGet(hash).contains(request.peer.id))) + ) + } + + it should "correctly handle incoming responses" in new TestSetup { + val perPeerCapacity = 20 + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + val (requests, newState1) = newState.assignTasksToPeers(peers, None, nodesPerPeerCapacity = perPeerCapacity) + assert(requests.size == 3) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + + val (handlingResult, newState2) = + newState1.handleRequestSuccess(requests(0).peer, NodeData(requests(0).nodes.map(h => hashNodeMap(h)).toList)) + + val usefulData = expectUsefulData(handlingResult) + assert(usefulData.responses.size == perPeerCapacity) + assert(requests(0).nodes.forall(h => !newState2.nodesToGet.contains(h))) + assert(newState2.activeRequests.size == 2) + + val (handlingResult1, newState3) = + newState2.handleRequestSuccess(requests(1).peer, NodeData(requests(1).nodes.map(h => hashNodeMap(h)).toList)) + val usefulData1 = expectUsefulData(handlingResult1) + assert(usefulData1.responses.size == perPeerCapacity) + assert(requests(1).nodes.forall(h => !newState3.nodesToGet.contains(h))) + assert(newState3.activeRequests.size == 1) + + val (handlingResult2, newState4) = + newState3.handleRequestSuccess(requests(2).peer, NodeData(requests(2).nodes.map(h => hashNodeMap(h)).toList)) + + val usefulData2 = expectUsefulData(handlingResult2) + assert(usefulData2.responses.size == perPeerCapacity) + assert(requests(2).nodes.forall(h => !newState4.nodesToGet.contains(h))) + assert(newState4.activeRequests.isEmpty) + } + + it should "ignore responses from not requested peers" in new TestSetup { + val perPeerCapacity = 20 + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + val (requests, newState1) = newState.assignTasksToPeers(peers, None, nodesPerPeerCapacity = perPeerCapacity) + assert(requests.size == 3) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + + val (handlingResult, newState2) = + newState1.handleRequestSuccess(notKnownPeer, NodeData(requests(0).nodes.map(h => hashNodeMap(h)).toList)) + assert(handlingResult == UnrequestedResponse) + // check that all requests are unchanged + assert(newState2.activeRequests.size == 3) + assert(requests.forall({ req => + req.nodes.forall(h => newState2.nodesToGet(h).contains(req.peer.id)) + })) + } + + it should "handle empty responses from from peers" in new TestSetup { + val perPeerCapacity = 20 + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + val (requests, newState1) = newState.assignTasksToPeers(peers, None, nodesPerPeerCapacity = perPeerCapacity) + assert(requests.size == 3) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + + val (handlingResult, newState2) = newState1.handleRequestSuccess(requests(0).peer, NodeData(Seq())) + assert(handlingResult == NoUsefulDataInResponse) + assert(newState2.activeRequests.size == 2) + // hashes are still in download queue but they are free to graby other peers + assert(requests(0).nodes.forall(h => newState2.nodesToGet(h).isEmpty)) + } + + it should "handle response where part of data is malformed (bad hashes)" in new TestSetup { + val perPeerCapacity = 20 + val goodResponseCap = perPeerCapacity / 2 + val newState = initialState.scheduleNewNodesForRetrieval(potentialNodesHashes) + val (requests, newState1) = newState.assignTasksToPeers( + NonEmptyList.fromListUnsafe(List(peer1)), + None, + nodesPerPeerCapacity = perPeerCapacity + ) + assert(requests.size == 1) + assert(requests.forall(req => req.nodes.size == perPeerCapacity)) + val peerRequest = requests.head + val goodResponse = peerRequest.nodes.toList.take(perPeerCapacity / 2).map(h => hashNodeMap(h)) + val badResponse = (200 until 210).map(ByteString(_)).toList + val (result, newState2) = newState1.handleRequestSuccess(requests(0).peer, NodeData(goodResponse ++ badResponse)) + + val usefulData = expectUsefulData(result) + assert(usefulData.responses.size == perPeerCapacity / 2) + assert(newState2.activeRequests.isEmpty) + // good responses where delivered and removed form request queue + assert(peerRequest.nodes.toList.take(goodResponseCap).forall(h => !newState2.nodesToGet.contains(h))) + // bad responses has been put back to map but without active peer + assert(peerRequest.nodes.toList.drop(goodResponseCap).forall(h => newState2.nodesToGet.contains(h))) + assert(peerRequest.nodes.toList.drop(goodResponseCap).forall(h => newState2.nodesToGet(h).isEmpty)) + } + + it should "handle response when there are spaces between delivered values" in new TestSetup { + val values = List(ByteString(1), ByteString(2), ByteString(3), ByteString(4), ByteString(5)) + val hashes = values.map(kec256) + val responses = hashes.zip(values).map(s => SyncResponse(s._1, s._2)) + + val requested = NonEmptyList.fromListUnsafe(hashes) + val received = NonEmptyList.fromListUnsafe(List(values(1), values(3))) + val (toReschedule, delivered) = initialState.process(requested, received) + + assert(toReschedule == List(hashes(4), hashes(2), hashes(0))) + assert(delivered == List(responses(1), responses(3))) + } + + it should "handle response when there is larger gap between values" in new TestSetup { + val values = List(ByteString(1), ByteString(2), ByteString(3), ByteString(4), ByteString(5)) + val hashes = values.map(kec256) + val responses = hashes.zip(values).map(s => SyncResponse(s._1, s._2)) + + val requested = NonEmptyList.fromListUnsafe(hashes) + val received = NonEmptyList.fromListUnsafe(List(values(0), values(4))) + val (toReschedule, delivered) = initialState.process(requested, received) + + assert(toReschedule == List(hashes(3), hashes(2), hashes(1))) + assert(delivered == List(responses(0), responses(4))) + } + + it should "handle response when only last value is delivered" in new TestSetup { + val values = List(ByteString(1), ByteString(2), ByteString(3), ByteString(4), ByteString(5)) + val hashes = values.map(kec256) + val responses = hashes.zip(values).map(s => SyncResponse(s._1, s._2)) + + val requested = NonEmptyList.fromListUnsafe(hashes) + val received = NonEmptyList.fromListUnsafe(List(values.last)) + val (toReschedule, delivered) = initialState.process(requested, received) + + assert(toReschedule == List(hashes(3), hashes(2), hashes(1), hashes(0))) + assert(delivered == List(responses.last)) + } + + it should "handle response when only first value is delivered" in new TestSetup { + val values = List(ByteString(1), ByteString(2), ByteString(3), ByteString(4), ByteString(5)) + val hashes = values.map(kec256) + val responses = hashes.zip(values).map(s => SyncResponse(s._1, s._2)) + + val requested = NonEmptyList.fromListUnsafe(hashes) + val received = NonEmptyList.fromListUnsafe(List(values.head)) + val (toReschedule, delivered) = initialState.process(requested, received) + assert(toReschedule == List(hashes(1), hashes(2), hashes(3), hashes(4))) + assert(delivered == List(responses.head)) + } + + it should "handle response when only middle values are delivered" in new TestSetup { + val values = List(ByteString(1), ByteString(2), ByteString(3), ByteString(4), ByteString(5)) + val hashes = values.map(kec256) + val responses = hashes.zip(values).map(s => SyncResponse(s._1, s._2)) + + val requested = NonEmptyList.fromListUnsafe(hashes) + val received = NonEmptyList.fromListUnsafe(List(values(2), values(3))) + val (toReschedule, delivered) = initialState.process(requested, received) + assert(toReschedule == List(hashes(4), hashes(1), hashes(0))) + assert(delivered == List(responses(2), responses(3))) + } + + trait TestSetup { + def expectUsefulData(result: ResponseProcessingResult): UsefulData = { + result match { + case SyncStateDownloaderActor.UnrequestedResponse => fail() + case SyncStateDownloaderActor.NoUsefulDataInResponse => fail() + case data @ UsefulData(_) => data + } + } + + val ref1 = TestProbe().ref + val ref2 = TestProbe().ref + val ref3 = TestProbe().ref + val ref4 = TestProbe().ref + + val initialState = DownloaderState(Map.empty, Map.empty) + val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), ref1, incomingConnection = false) + val peer2 = Peer(new InetSocketAddress("127.0.0.1", 2), ref2, incomingConnection = false) + val peer3 = Peer(new InetSocketAddress("127.0.0.1", 3), ref3, incomingConnection = false) + val notKnownPeer = Peer(new InetSocketAddress("127.0.0.1", 4), ref4, incomingConnection = false) + val peers = NonEmptyList.fromListUnsafe(List(peer1, peer2, peer3)) + val potentialNodes = (1 to 100).map(i => ByteString(i)).toList + val potentialNodesHashes = potentialNodes.map(node => kec256(node)) + val hashNodeMap = potentialNodesHashes.zip(potentialNodes).toMap + } + +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateSpec.scala deleted file mode 100644 index af1e08e986..0000000000 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncStateSpec.scala +++ /dev/null @@ -1,28 +0,0 @@ -package io.iohk.ethereum.blockchain.sync - -import akka.util.ByteString -import io.iohk.ethereum.Fixtures -import io.iohk.ethereum.blockchain.sync.FastSync.{EvmCodeHash, StateMptNodeHash, SyncState} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class SyncStateSpec extends AnyFlatSpec with Matchers { - - "SyncState" should "prepend mpt nodes when enqueueing them" in { - val syncState = SyncState( - pivotBlock = Fixtures.Blocks.ValidBlock.header, - pendingMptNodes = toStateMptNodeHash("1", "2", "3"), - pendingNonMptNodes = toEvmCodeHash("a", "b", "c") - ) - - val resultingSyncState = - syncState.addPendingNodes(toStateMptNodeHash("4", "5", "6")).addPendingNodes(toEvmCodeHash("d", "e", "f")) - - resultingSyncState.pendingMptNodes shouldEqual toStateMptNodeHash("4", "5", "6", "1", "2", "3") - resultingSyncState.pendingNonMptNodes shouldEqual toEvmCodeHash("d", "e", "f", "a", "b", "c") - } - - def toStateMptNodeHash(seq: String*): Seq[StateMptNodeHash] = seq.map(s => StateMptNodeHash(ByteString(s))) - - def toEvmCodeHash(seq: String*): Seq[EvmCodeHash] = seq.map(s => EvmCodeHash(ByteString(s))) -}