From b4d1c710dcb38b58d0f7ae855cb703d8b56eff42 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Tue, 15 Sep 2020 07:32:14 +0200 Subject: [PATCH 1/9] [ETCM-102] Add integration tests to fast sync --- build.sbt | 3 + src/it/resources/logback-test.xml | 32 ++ .../iohk/ethereum/sync/FastSyncItSpec.scala | 335 ++++++++++++++++++ .../ethereum/blockchain/sync/FastSync.scala | 8 +- .../scala/io/iohk/ethereum/network/peer.scala | 1 + 5 files changed, 377 insertions(+), 2 deletions(-) create mode 100644 src/it/resources/logback-test.xml create mode 100644 src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala diff --git a/build.sbt b/build.sbt index 8bed791345..57f2d82e3a 100644 --- a/build.sbt +++ b/build.sbt @@ -44,6 +44,9 @@ val dep = { "org.bouncycastle" % "bcprov-jdk15on" % "1.59", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "org.typelevel" %% "mouse" % "0.18", + "org.typelevel" %% "cats-core" % "2.0.0", + "org.typelevel" %% "cats-effect" % "2.0.0", + "io.monix" %% "monix" % "3.1.0", "com.twitter" %% "util-collection" % "18.5.0", "com.google.guava" % "guava" % "28.0-jre", diff --git a/src/it/resources/logback-test.xml b/src/it/resources/logback-test.xml new file mode 100644 index 0000000000..1790d678dc --- /dev/null +++ b/src/it/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + + ${stdoutEncoderPattern} + + + + + ${user.home}/.mantis/logs/mantis.log + true + + ${user.home}/.mantis/logs/mantis.%i.log.zip + 1 + 10 + + + 10MB + + + ${fileEncoderPattern} + + + + + + + + diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala new file mode 100644 index 0000000000..b8d7d33373 --- /dev/null +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -0,0 +1,335 @@ +package io.iohk.ethereum.sync + +import java.net.{InetSocketAddress, ServerSocket} +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.TestProbe +import akka.util.{ByteString, Timeout} +import cats.effect.Resource +import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed +import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSyncConfig} +import io.iohk.ethereum.db.components.{SharedEphemDataSources, Storages} +import io.iohk.ethereum.db.storage.AppStateStorage +import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} +import io.iohk.ethereum.domain.{Block, Blockchain, BlockchainImpl} +import io.iohk.ethereum.mpt.MerklePatriciaTrie +import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo +import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration} +import io.iohk.ethereum.network.discovery.Node +import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo} +import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker} +import io.iohk.ethereum.network.p2p.EthereumMessageDecoder +import io.iohk.ethereum.network.rlpx.AuthHandshaker +import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration +import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor} +import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} +import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, customTestCaseResourceM} +import io.iohk.ethereum.utils.ServerStatus.Listening +import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig} +import io.iohk.ethereum.vm.EvmConfig +import io.iohk.ethereum.{Fixtures, Timeouts} +import monix.eval.Task +import monix.execution.Scheduler +import org.scalatest.{Assertion, AsyncFlatSpec, BeforeAndAfter, Matchers} + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { + implicit val testScheduler = Scheduler.fixedPool("test", 16) + + "FastSync" should "should sync blockchain without state nodes" in customTestCaseResourceM(FakePeer.start3FakePeersRes()) { + case (peer1, peer2, peer3) => + for { + _ <- Task.parZip3(peer1.startPeer(), peer2.startPeer(), peer3.startPeer()) + _ <- peer2.saveNBlocks(1000) + _ <- peer3.saveNBlocks(1000) + _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) + _ <- peer1.startFastSync() + _ <- peer1.waitForFastSyncFinish() + } yield { + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset) + } + } + +} + +object FastSyncItSpec { + private def retryUntilWithDelay[A](source: Task[A], delay: FiniteDuration, maxRetries: Int)( + predicate: A => Boolean + ): Task[A] = { + source.delayExecution(delay).flatMap { result => + if (predicate(result)) { + Task.now(result) + } else { + if (maxRetries > 0) { + retryUntilWithDelay(source, delay, maxRetries - 1)(predicate) + } else { + Task.raiseError(new TimeoutException("Task time out after all retries")) + } + } + } + } + + def randomAddress(): InetSocketAddress = { + val s = new ServerSocket(0) + try { + new InetSocketAddress("localhost", s.getLocalPort) + } finally { + s.close() + } + } + + def customTestCaseResourceM[T](fixture: Resource[Task, T]) + (theTest: T => Task[Assertion])(implicit s: Scheduler): Future[Assertion] = { + fixture.use(theTest).runToFuture + } + + def generateBlockChain(startBlock: Block, number: Int): Seq[Block] = { + def recur(last: Block, blocksLeft: Int, blocksCreated: List[Block]): List[Block] = { + if (blocksLeft <= 0) { + blocksCreated.reverse + } else { + val newBlock = last.copy(header = last.header.copy(parentHash = last.header.hash, number = last.header.number + 1)) + recur(newBlock, blocksLeft - 1, newBlock :: blocksCreated) + } + } + recur(startBlock, number, List.empty) + } + + class FakePeer(peerName: String) extends SecureRandomBuilder with TestSyncConfig { + implicit val akkaTimeout: Timeout = Timeout(5.second) + + val config = Config.config + + import scala.language.postfixOps + + implicit val system = ActorSystem(peerName) + + val peerDiscoveryManager = TestProbe().ref + + val nodeKey = io.iohk.ethereum.crypto.generateKeyPair(secureRandom) + + private val nodeStatus = + NodeStatus( + key = nodeKey, + serverStatus = ServerStatus.NotListening, + discoveryStatus = ServerStatus.NotListening + ) + + sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { + override lazy val pruningMode: PruningMode = ArchivePruning + } + + lazy val nodeStatusHolder = new AtomicReference(nodeStatus) + lazy val storagesInstance = new SharedEphemDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages + lazy val blockchainConfig = Config.blockchains.blockchainConfig + /** + * Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when + * it is persisted in storage. + */ + lazy val knownNodesManagerConfig = + KnownNodesManager.KnownNodesManagerConfig(config).copy(persistInterval = 1.seconds) + + lazy val knownNodesManager = system.actorOf( + KnownNodesManager.props( + knownNodesManagerConfig, + storagesInstance.storages.knownNodesStorage + ) + ) + + val bl = BlockchainImpl(storagesInstance.storages) + + val genesis = Block(Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash) ), Fixtures.Blocks.Genesis.body) + + bl.save(genesis, Seq(), genesis.header.difficulty, saveAsBestBlock = true) + + lazy val nh = nodeStatusHolder + + val peerConf = new PeerConfiguration { + override val fastSyncHostConfiguration: FastSyncHostConfiguration = new FastSyncHostConfiguration { + val maxBlocksHeadersPerMessage: Int = 200 + val maxBlocksBodiesPerMessage: Int = 200 + val maxReceiptsPerMessage: Int = 200 + val maxMptComponentsPerMessage: Int = 200 + } + override val rlpxConfiguration: RLPxConfiguration = new RLPxConfiguration { + override val waitForTcpAckTimeout: FiniteDuration = Timeouts.normalTimeout + override val waitForHandshakeTimeout: FiniteDuration = Timeouts.normalTimeout + } + override val waitForHelloTimeout: FiniteDuration = 3 seconds + override val waitForStatusTimeout: FiniteDuration = 30 seconds + override val waitForChainCheckTimeout: FiniteDuration = 15 seconds + override val connectMaxRetries: Int = 3 + override val connectRetryDelay: FiniteDuration = 1 second + override val disconnectPoisonPillTimeout: FiniteDuration = 3 seconds + override val maxOutgoingPeers = 10 + override val maxIncomingPeers = 5 + override val maxPendingPeers = 5 + override val networkId: Int = 1 + + override val updateNodesInitialDelay: FiniteDuration = 5.seconds + override val updateNodesInterval: FiniteDuration = 20.seconds + override val shortBlacklistDuration: FiniteDuration = 1.minute + override val longBlacklistDuration: FiniteDuration = 3.minutes + } + + lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus") + + private val handshakerConfiguration: EtcHandshakerConfiguration = + new EtcHandshakerConfiguration { + override val forkResolverOpt: Option[ForkResolver] = None + override val nodeStatusHolder: AtomicReference[NodeStatus] = nh + override val peerConfiguration: PeerConfiguration = peerConf + override val blockchain: Blockchain = bl + override val appStateStorage: AppStateStorage = storagesInstance.storages.appStateStorage + } + + lazy val handshaker: Handshaker[PeerInfo] = EtcHandshaker(handshakerConfiguration) + + lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom) + + lazy val peerManager: ActorRef = system.actorOf(PeerManagerActor.props( + peerDiscoveryManager, + Config.Network.peer, + peerEventBus, + knownNodesManager, + handshaker, + authHandshaker, + EthereumMessageDecoder + ), "peer-manager") + + lazy val etcPeerManager: ActorRef = system.actorOf(EtcPeerManagerActor.props( + peerManager, peerEventBus, storagesInstance.storages.appStateStorage, None), "etc-peer-manager") + + val blockchainHost: ActorRef = system.actorOf(BlockchainHostActor.props( + bl, peerConf, peerEventBus, etcPeerManager), "blockchain-host") + + lazy val server: ActorRef = system.actorOf(ServerActor.props(nodeStatusHolder, peerManager), "server") + + val listenAddress = randomAddress() + + lazy val node = + DiscoveryNodeInfo(Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort), 1) + + lazy val vmConfig = VmConfig(Config.config) + + lazy val validators= new MockValidatorsAlwaysSucceed + + val testSyncConfig = syncConfig.copy( + minPeersToChooseTargetBlock = 1, + peersScanInterval = 1.second, + blockHeadersPerRequest = 200, + blockBodiesPerRequest = 50, + receiptsPerRequest = 50, + fastSyncThrottle = 10.milliseconds, + startRetryInterval = 50.milliseconds, + ) + + lazy val fastSync = system.actorOf(FastSync.props( + storagesInstance.storages.fastSyncStateStorage, + storagesInstance.storages.appStateStorage, + bl, + validators, + peerEventBus, + etcPeerManager, + testSyncConfig, + system.scheduler + )) + + def getMptForBlock(blockHeaderNumber: BigInt) = { + bl.getWorldStateProxy( + blockNumber = blockHeaderNumber, + accountStartNonce = blockchainConfig.accountStartNonce, + stateRootHash = bl.getBlockByNumber(blockHeaderNumber).map(_.header.stateRoot), + noEmptyAccounts = EvmConfig.forBlock(blockHeaderNumber, blockchainConfig).noEmptyAccounts, + ethCompatibleStorage = blockchainConfig.ethCompatibleStorage + ) + } + + def startPeer(): Task[Unit] = { + for { + _ <- Task { + peerManager ! PeerManagerActor.StartConnecting + server ! ServerActor.StartServer(listenAddress) + } + _ <- retryUntilWithDelay(Task(nodeStatusHolder.get()), 1.second, 5) {status => + status.serverStatus == Listening(listenAddress) + } + } yield () + } + + def shutdown(): Task[Unit] = { + Task.deferFuture(system.terminate()).map(_ => ()) + } + + def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = { + for { + _ <- Task { + peerManager ! DiscoveredNodesInfo(nodes) + } + _ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5){ knownNodes => + val requestedNodes = nodes.map(_.node.id) + val currentNodes = knownNodes.map(Node.fromUri).map(_.id) + requestedNodes.subsetOf(currentNodes) + } + } yield () + } + + import akka.pattern.ask + def getHandshakedPeers: Task[PeerManagerActor.Peers] = { + Task.deferFutureAction{s => + implicit val ec = s + (peerManager ? PeerManagerActor.GetPeers).mapTo[PeerManagerActor.Peers] + } + } + + def saveNBlocks(n: Int) = Task { + val lastBlock = bl.getBestBlock() + val chain = generateBlockChain(lastBlock, n) + chain.foreach(block => bl.save(block, Seq(), block.header.difficulty, true)) + } + + def startFastSync(): Task[Unit] = Task { + fastSync ! FastSync.Start + } + + def waitForFastSyncFinish(): Task[Boolean] = { + retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 30){ isDone => + isDone + } + } + } + + object FakePeer { + def startFakePeer(peerName: String): Task[FakePeer] = { + for { + peer <- Task(new FakePeer(peerName)).memoizeOnSuccess + _ <- peer.startPeer() + } yield peer + } + + def start1FakePeerRes(): Resource[Task, FakePeer] = { + Resource.make { + startFakePeer("Peer1") + } { peer => + peer.shutdown() + } + } + + def start2FakePeersRes() = { + Resource.make { + Task.parZip2(startFakePeer("Peer1"), startFakePeer("Peer2")) + } { case (peer, peer1) => Task.parMap2(peer.shutdown(), peer1.shutdown())((_ ,_)=> ())} + } + + def start3FakePeersRes() = { + Resource.make { + Task.parZip3( startFakePeer("Peer1"), startFakePeer("Peer2"), startFakePeer("Peer3")) + } { case (peer, peer1, peer2) => Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_ ,_, _)=> ())} + } + } +} \ No newline at end of file diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala index 3d4f046f6b..f335cdac38 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala @@ -11,7 +11,7 @@ import io.iohk.ethereum.consensus.validators.Validators import io.iohk.ethereum.crypto.kec256 import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage} import io.iohk.ethereum.domain._ -import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MptNode} +import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode} import io.iohk.ethereum.network.Peer import io.iohk.ethereum.network.p2p.messages.PV62._ import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._ @@ -208,7 +208,11 @@ class FastSync( case ImportedLastBlock => if (targetBlockHeader.number - syncState.targetBlock.number <= syncConfig.maxTargetDifference) { log.info(s"Current target block is fresh enough, starting state download") - syncState = syncState.copy(pendingMptNodes = Seq(StateMptNodeHash(syncState.targetBlock.stateRoot))) + if (syncState.targetBlock.stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) { + syncState = syncState.copy(pendingMptNodes = Seq()) + } else { + syncState = syncState.copy(pendingMptNodes = Seq(StateMptNodeHash(syncState.targetBlock.stateRoot))) + } } else { syncState = syncState.updateTargetBlock(targetBlockHeader, syncConfig.fastSyncBlockValidationX, updateFailures = false) log.info(s"Changing target block to ${targetBlockHeader.number}, new safe target is ${syncState.safeDownloadTarget}") diff --git a/src/main/scala/io/iohk/ethereum/network/peer.scala b/src/main/scala/io/iohk/ethereum/network/peer.scala index 1aa7be75de..64b1777786 100644 --- a/src/main/scala/io/iohk/ethereum/network/peer.scala +++ b/src/main/scala/io/iohk/ethereum/network/peer.scala @@ -8,5 +8,6 @@ import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId case class PeerId(value: String) extends BlackListId case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) { + // FIXME PeerId should be actual peerId i.e id derived form node public key def id: PeerId = PeerId(ref.path.name) } From 1238df41c29dc502c024b4aace1d52f42a3e65aa Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Tue, 15 Sep 2020 13:26:05 +0200 Subject: [PATCH 2/9] [ETCM-102] Add integration tests to fast sync --- .../iohk/ethereum/sync/FastSyncItSpec.scala | 117 +++++++++++++----- 1 file changed, 88 insertions(+), 29 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index b8d7d33373..f1c09f220b 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -13,8 +13,9 @@ import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSync import io.iohk.ethereum.db.components.{SharedEphemDataSources, Storages} import io.iohk.ethereum.db.storage.AppStateStorage import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} -import io.iohk.ethereum.domain.{Block, Blockchain, BlockchainImpl} -import io.iohk.ethereum.mpt.MerklePatriciaTrie +import io.iohk.ethereum.domain.{Account, Address, Block, Blockchain, BlockchainImpl} +import io.iohk.ethereum.ledger.InMemoryWorldStateProxy +import io.iohk.ethereum.mpt.{HashNode, MerklePatriciaTrie, MptNode, MptTraversals} import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration} import io.iohk.ethereum.network.discovery.Node @@ -25,7 +26,7 @@ import io.iohk.ethereum.network.rlpx.AuthHandshaker import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor} import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} -import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, customTestCaseResourceM} +import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, customTestCaseResourceM, updateStateAtBlock} import io.iohk.ethereum.utils.ServerStatus.Listening import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig} import io.iohk.ethereum.vm.EvmConfig @@ -36,6 +37,7 @@ import org.scalatest.{Assertion, AsyncFlatSpec, BeforeAndAfter, Matchers} import scala.concurrent.Future import scala.concurrent.duration._ +import scala.util.Try class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { implicit val testScheduler = Scheduler.fixedPool("test", 16) @@ -43,9 +45,8 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { "FastSync" should "should sync blockchain without state nodes" in customTestCaseResourceM(FakePeer.start3FakePeersRes()) { case (peer1, peer2, peer3) => for { - _ <- Task.parZip3(peer1.startPeer(), peer2.startPeer(), peer3.startPeer()) - _ <- peer2.saveNBlocks(1000) - _ <- peer3.saveNBlocks(1000) + _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate) + _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000)(IdentityUpdate) _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) _ <- peer1.startFastSync() _ <- peer1.waitForFastSyncFinish() @@ -55,6 +56,21 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { } } + it should "should sync blockchain with state nodes" in customTestCaseResourceM(FakePeer.start3FakePeersRes()) { + case (peer1, peer2, peer3) => + for { + _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(updateStateAtBlock(500)) + _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000)(updateStateAtBlock(500)) + _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) + _ <- peer1.startFastSync() + _ <- peer1.waitForFastSyncFinish() + } yield { + val trie = peer1.getBestBlockTrie() + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset) + assert(trie.isDefined) + } + } } object FastSyncItSpec { @@ -88,16 +104,31 @@ object FastSyncItSpec { fixture.use(theTest).runToFuture } - def generateBlockChain(startBlock: Block, number: Int): Seq[Block] = { - def recur(last: Block, blocksLeft: Int, blocksCreated: List[Block]): List[Block] = { - if (blocksLeft <= 0) { - blocksCreated.reverse - } else { - val newBlock = last.copy(header = last.header.copy(parentHash = last.header.hash, number = last.header.number + 1)) - recur(newBlock, blocksLeft - 1, newBlock :: blocksCreated) - } + final case class BlockchainState(bestBlock: Block, currentWorldState: InMemoryWorldStateProxy, currentTd: BigInt) + + val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world + + def updateWorldWithNRandomAcounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { + val resultWorld = (0 until n).foldLeft(world) { (world, num) => + val randomBalance = num + val randomAddress = Address(num) + val codeBytes = BigInt(num).toByteArray + val storage = world.getStorage(randomAddress) + val changedStorage = (num until num + 20).foldLeft(storage)((storage, value) => storage.store(value, value)) + world + .saveAccount(randomAddress, Account.empty().copy(balance = randomBalance)) + .saveCode(randomAddress, ByteString(codeBytes)) + .saveStorage(randomAddress, changedStorage) + } + InMemoryWorldStateProxy.persistState(resultWorld) + } + + def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) => + if (blockNr == blockWithUpdate) { + updateWorldWithNRandomAcounts(1000, world) + } else { + IdentityUpdate(blockNr, world) } - recur(startBlock, number, List.empty) } class FakePeer(peerName: String) extends SecureRandomBuilder with TestSyncConfig { @@ -227,6 +258,7 @@ object FastSyncItSpec { receiptsPerRequest = 50, fastSyncThrottle = 10.milliseconds, startRetryInterval = 50.milliseconds, + nodesPerRequest = 200 ) lazy val fastSync = system.actorOf(FastSync.props( @@ -240,16 +272,23 @@ object FastSyncItSpec { system.scheduler )) - def getMptForBlock(blockHeaderNumber: BigInt) = { + private def getMptForBlock(block: Block) = { bl.getWorldStateProxy( - blockNumber = blockHeaderNumber, + blockNumber = block.number, accountStartNonce = blockchainConfig.accountStartNonce, - stateRootHash = bl.getBlockByNumber(blockHeaderNumber).map(_.header.stateRoot), - noEmptyAccounts = EvmConfig.forBlock(blockHeaderNumber, blockchainConfig).noEmptyAccounts, + stateRootHash = Some(block.header.stateRoot), + noEmptyAccounts = EvmConfig.forBlock(block.number, blockchainConfig).noEmptyAccounts, ethCompatibleStorage = blockchainConfig.ethCompatibleStorage ) } + def getCurrentState(): BlockchainState = { + val bestBlock = bl.getBestBlock() + val currentWorldState = getMptForBlock(bestBlock) + val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get + BlockchainState(bestBlock, currentWorldState, currentTd) + } + def startPeer(): Task[Unit] = { for { _ <- Task { @@ -279,18 +318,29 @@ object FastSyncItSpec { } yield () } - import akka.pattern.ask - def getHandshakedPeers: Task[PeerManagerActor.Peers] = { - Task.deferFutureAction{s => - implicit val ec = s - (peerManager ? PeerManagerActor.GetPeers).mapTo[PeerManagerActor.Peers] - } + private def createChildBlock(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy) + (updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): (Block, BigInt, InMemoryWorldStateProxy) = { + val newBlockNumber = parent.header.number + 1 + val newWorld = updateWorldForBlock(newBlockNumber, parentWorld) + val newBlock = parent.copy(header = parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash)) + val newTd = newBlock.header.difficulty + parentTd + (newBlock, newTd, parentWorld) } - def saveNBlocks(n: Int) = Task { - val lastBlock = bl.getBestBlock() - val chain = generateBlockChain(lastBlock, n) - chain.foreach(block => bl.save(block, Seq(), block.header.difficulty, true)) + def importNBlocksToTheTopForm(startState: BlockchainState, n: Int) + (updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { + def go(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy, blocksLeft: Int): Task[Unit] = { + if (blocksLeft <= 0) { + Task.now(()) + } else { + val (newBlock, newTd, newWorld) = createChildBlock(parent, parentTd, parentWorld)(updateWorldForBlock) + bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) + bl.persistCachedNodes() + go(newBlock, newTd, newWorld, blocksLeft - 1) + } + } + + go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n) } def startFastSync(): Task[Unit] = Task { @@ -302,6 +352,15 @@ object FastSyncItSpec { isDone } } + + // Reads whole trie into memory, if the trie lacks nodes in storage it will be None + def getBestBlockTrie(): Option[MptNode] = { + Try { + val bestBlock = bl.getBestBlock() + val bestStateRoot =bestBlock.header.stateRoot + MptTraversals.parseTrieIntoMemory(HashNode(bestStateRoot.toArray), storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number)) + }.toOption + } } object FakePeer { From 72da7a39c35b262751d9588cf06da038d7946245 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 16 Sep 2020 11:34:54 +0200 Subject: [PATCH 3/9] [ETCM-102] Fix bugs found during integration testing --- src/it/resources/logback-test.xml | 2 +- .../iohk/ethereum/sync/FastSyncItSpec.scala | 99 +++++++++++++--- .../sync/FastSyncTargetBlockSelector.scala | 4 +- .../blockchain/sync/PeersClient.scala | 2 +- .../sync/regular/OldRegularSync.scala | 2 +- .../db/dataSource/EphemDataSource.scala | 13 ++- .../network/EtcPeerManagerActor.scala | 45 +++++--- .../EtcForkBlockExchangeState.scala | 4 +- .../EtcNodeStatusExchangeState.scala | 2 +- src/test/scala/io/iohk/ethereum/Mocks.scala | 2 +- .../blockchain/sync/BlockBroadcastSpec.scala | 3 +- .../blockchain/sync/SyncControllerSpec.scala | 14 +-- .../sync/regular/OldRegularSyncSpec.scala | 2 +- .../sync/regular/RegularSyncFixtures.scala | 2 +- .../ethereum/jsonrpc/DebugServiceSpec.scala | 3 +- .../jsonrpc/JsonRpcControllerSpec.scala | 3 +- .../ethereum/network/EtcPeerManagerSpec.scala | 106 ++++++------------ .../network/PeerActorHandshakingSpec.scala | 2 +- .../network/PeerEventBusActorSpec.scala | 3 +- .../ethereum/network/PeerManagerSpec.scala | 3 +- .../handshaker/EtcHandshakerSpec.scala | 9 +- 21 files changed, 189 insertions(+), 136 deletions(-) diff --git a/src/it/resources/logback-test.xml b/src/it/resources/logback-test.xml index 1790d678dc..ee14d8afd2 100644 --- a/src/it/resources/logback-test.xml +++ b/src/it/resources/logback-test.xml @@ -25,7 +25,7 @@ - + diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index f1c09f220b..743a9aea56 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -1,6 +1,7 @@ package io.iohk.ethereum.sync import java.net.{InetSocketAddress, ServerSocket} +import java.nio.file.Files import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicReference @@ -9,9 +10,12 @@ import akka.testkit.TestProbe import akka.util.{ByteString, Timeout} import cats.effect.Resource import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed -import io.iohk.ethereum.blockchain.sync.{BlockchainHostActor, FastSync, TestSyncConfig} -import io.iohk.ethereum.db.components.{SharedEphemDataSources, Storages} -import io.iohk.ethereum.db.storage.AppStateStorage +import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor +import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock +import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} +import io.iohk.ethereum.db.components.{SharedRocksDbDataSources, Storages} +import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} +import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} import io.iohk.ethereum.domain.{Account, Address, Block, Blockchain, BlockchainImpl} import io.iohk.ethereum.ledger.InMemoryWorldStateProxy @@ -22,6 +26,7 @@ import io.iohk.ethereum.network.discovery.Node import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo} import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker} import io.iohk.ethereum.network.p2p.EthereumMessageDecoder +import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock import io.iohk.ethereum.network.rlpx.AuthHandshaker import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor} @@ -51,8 +56,8 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { _ <- peer1.startFastSync() _ <- peer1.waitForFastSyncFinish() } yield { - assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset) - assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) } } @@ -66,11 +71,25 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { _ <- peer1.waitForFastSyncFinish() } yield { val trie = peer1.getBestBlockTrie() - assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.syncConfig.targetBlockOffset) - assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.syncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) + assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) assert(trie.isDefined) } } + + + it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { + case (peer1, peer2) => + for { + _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate) + _ <- peer1.connectToPeers(Set(peer2.node)) + _ <- peer2.syncUntil(2000)(IdentityUpdate).startAndForget + _ <- peer1.startFastSync() + _ <- peer1.waitForFastSyncFinish() + } yield { + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) + } + } } object FastSyncItSpec { @@ -108,7 +127,7 @@ object FastSyncItSpec { val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world - def updateWorldWithNRandomAcounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { + def updateWorldWithNRandomAccounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { val resultWorld = (0 until n).foldLeft(world) { (world, num) => val randomBalance = num val randomAddress = Address(num) @@ -125,7 +144,7 @@ object FastSyncItSpec { def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) => if (blockNr == blockWithUpdate) { - updateWorldWithNRandomAcounts(1000, world) + updateWorldWithNRandomAccounts(1000, world) } else { IdentityUpdate(blockNr, world) } @@ -151,12 +170,30 @@ object FastSyncItSpec { discoveryStatus = ServerStatus.NotListening ) + lazy val tempDir = Files.createTempDirectory("temp-fast-sync") + + def getRockDbTestConfig(dbPath: String) = { + new RocksDbConfig { + override val createIfMissing: Boolean = true + override val paranoidChecks: Boolean = false + override val path: String = dbPath + override val maxThreads: Int = 1 + override val maxOpenFiles: Int = 32 + override val verifyChecksums: Boolean = false + override val levelCompaction: Boolean = true + override val blockSize: Long = 16384 + override val blockCacheSize: Long = 33554432 + } + } + sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { override lazy val pruningMode: PruningMode = ArchivePruning } lazy val nodeStatusHolder = new AtomicReference(nodeStatus) - lazy val storagesInstance = new SharedEphemDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages + lazy val storagesInstance = new SharedRocksDbDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages { + override lazy val dataSource: RocksDbDataSource = RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq) + } lazy val blockchainConfig = Config.blockchains.blockchainConfig /** * Default persist interval is 20s, which is too long for tests. As in all tests we treat peer as connected when @@ -252,15 +289,21 @@ object FastSyncItSpec { val testSyncConfig = syncConfig.copy( minPeersToChooseTargetBlock = 1, - peersScanInterval = 1.second, + peersScanInterval = 5.milliseconds, blockHeadersPerRequest = 200, blockBodiesPerRequest = 50, receiptsPerRequest = 50, fastSyncThrottle = 10.milliseconds, startRetryInterval = 50.milliseconds, - nodesPerRequest = 200 + nodesPerRequest = 200, + maxTargetDifference = 1, + syncRetryInterval = 50.milliseconds ) + lazy val broadcaster = new BlockBroadcast(etcPeerManager, testSyncConfig) + + lazy val broadcasterActor = system.actorOf(BlockBroadcasterActor.props(broadcaster, peerEventBus, etcPeerManager, testSyncConfig, system.scheduler)) + lazy val fastSync = system.actorOf(FastSync.props( storagesInstance.storages.fastSyncStateStorage, storagesInstance.storages.appStateStorage, @@ -282,6 +325,10 @@ object FastSyncItSpec { ) } + private def broadcastBlock(block: Block, td: BigInt) = { + broadcasterActor ! BroadcastBlock(NewBlock(block, td)) + } + def getCurrentState(): BlockchainState = { val bestBlock = bl.getBestBlock() val currentWorldState = getMptForBlock(bestBlock) @@ -302,7 +349,10 @@ object FastSyncItSpec { } def shutdown(): Task[Unit] = { - Task.deferFuture(system.terminate()).map(_ => ()) + for { + _ <- Task.deferFuture(system.terminate()) + _ <- Task(storagesInstance.dataSource.destroy()) + } yield () } def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = { @@ -343,12 +393,29 @@ object FastSyncItSpec { go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n) } + def syncUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { + Task(bl.getBestBlock()).flatMap { block => + if (block.number >= n) { + Task(()) + } else { + Task { + val currentTd = bl.getTotalDifficultyByHash(block.hash).get + val currentWolrd = getMptForBlock(block) + val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock) + bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) + bl.persistCachedNodes() + broadcastBlock(newBlock, newTd) + }.flatMap(_ => syncUntil(n)(updateWorldForBlock)) + } + } + } + def startFastSync(): Task[Unit] = Task { fastSync ! FastSync.Start } def waitForFastSyncFinish(): Task[Boolean] = { - retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 30){ isDone => + retryUntilWithDelay(Task(storagesInstance.storages.appStateStorage.isFastSyncDone()), 1.second, 90){ isDone => isDone } } @@ -357,7 +424,7 @@ object FastSyncItSpec { def getBestBlockTrie(): Option[MptNode] = { Try { val bestBlock = bl.getBestBlock() - val bestStateRoot =bestBlock.header.stateRoot + val bestStateRoot = bestBlock.header.stateRoot MptTraversals.parseTrieIntoMemory(HashNode(bestStateRoot.toArray), storagesInstance.storages.stateStorage.getBackingStorage(bestBlock.number)) }.toOption } @@ -391,4 +458,4 @@ object FastSyncItSpec { } { case (peer, peer1, peer2) => Task.parMap3(peer.shutdown(), peer1.shutdown(), peer2.shutdown())((_ ,_, _)=> ())} } } -} \ No newline at end of file +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala index 42b8b5c947..6f6eb2ee4f 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/FastSyncTargetBlockSelector.scala @@ -34,9 +34,9 @@ class FastSyncTargetBlockSelector( val peersUsedToChooseTarget = peersToDownloadFrom.filter(_._2.forkAccepted) if (peersUsedToChooseTarget.size >= minPeersToChooseTargetBlock) { - peersUsedToChooseTarget.foreach { case (peer, PeerInfo(status, _, _, _)) => + peersUsedToChooseTarget.foreach { case (peer, PeerInfo(_, _, _, _, bestBlockHash)) => peerEventBus ! Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id))) - etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(status.bestHash), 1, 0, reverse = false), peer.id) + etcPeerManager ! EtcPeerManagerActor.SendMessage(GetBlockHeaders(Right(bestBlockHash), 1, 0, reverse = false), peer.id) } log.debug("Asking {} peers for block headers", peersUsedToChooseTarget.size) val timeout = scheduler.scheduleOnce(peerResponseTimeout, self, BlockHeadersTimeout) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala index 4ff35e66cb..c8396e705a 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala @@ -134,7 +134,7 @@ object PeersClient { def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = { val peersToUse = peersToDownloadFrom .collect { - case (ref, PeerInfo(_, totalDifficulty, true, _)) => + case (ref, PeerInfo(_, totalDifficulty, true, _, _)) => (ref, totalDifficulty) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSync.scala index 68621eb5cf..86ec1a73b1 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSync.scala @@ -559,7 +559,7 @@ class OldRegularSync( private def bestPeer: Option[Peer] = { val peersToUse = peersToDownloadFrom - .collect{ case (ref, PeerInfo(_, totalDifficulty, true, _)) => (ref, totalDifficulty) } + .collect{ case (ref, PeerInfo(_, totalDifficulty, true, _, _)) => (ref, totalDifficulty) } if (peersToUse.nonEmpty) { val (peer, _) = peersToUse.maxBy{ case (_, td) => td } diff --git a/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala b/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala index ad06c8af60..f7cb8c9464 100644 --- a/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala +++ b/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala @@ -9,12 +9,15 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou * key.drop to remove namespace prefix from the key * @return key values paris from this storage */ - def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] = + def getAll(namespace: Namespace): Seq[(IndexedSeq[Byte], IndexedSeq[Byte])] = synchronized { storage.toSeq.map{case (key, value) => (key.array().drop(namespace.length).toIndexedSeq, value.toIndexedSeq)} + } - override def get(namespace: Namespace, key: Key): Option[Value] = storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq) + override def get(namespace: Namespace, key: Key): Option[Value] = synchronized { + storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq) + } - override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = { + override def update(namespace: Namespace, toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): DataSource = synchronized { val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap((namespace ++ key).toArray)) val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) => storage + (ByteBuffer.wrap((namespace ++ toUpdate._1).toArray) -> toUpdate._2.toArray)) @@ -22,7 +25,7 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou this } - override def clear: DataSource = { + override def clear: DataSource = synchronized { storage = Map() this } @@ -31,7 +34,7 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou override def destroy(): Unit = () - override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = { + override def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): DataSource = synchronized { val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap(key)) val afterUpdate = toUpsert.foldLeft(afterRemoval)((storage, toUpdate) => storage + (ByteBuffer.wrap(toUpdate._1) -> toUpdate._2)) diff --git a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala index 8e930fcfc3..80794461e9 100644 --- a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala @@ -1,6 +1,7 @@ package io.iohk.ethereum.network import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.util.ByteString import io.iohk.ethereum.db.storage.AppStateStorage import io.iohk.ethereum.network.PeerActor.{DisconnectPeer, SendMessage} import io.iohk.ethereum.network.EtcPeerManagerActor._ @@ -37,6 +38,10 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe def handleMessages(peersWithInfo: PeersWithInfo): Receive = handleCommonMessages(peersWithInfo) orElse handlePeersInfoEvents(peersWithInfo) + private def peerHasUdpatedBestBlock(peerInfo: PeerInfo): Boolean = { + val peerBestBlockIsItsGenesisBlock = peerInfo.bestBlockHash == peerInfo.remoteStatus.genesisHash + peerBestBlockIsItsGenesisBlock || (!peerBestBlockIsItsGenesisBlock && peerInfo.maxBlockNumber > 0) + } /** * Processes both messages for sending messages and for requesting peer information * @@ -44,9 +49,10 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe */ private def handleCommonMessages(peersWithInfo: PeersWithInfo): Receive = { case GetHandshakedPeers => - // Provide only peers which already responded to request for best block hash + // Provide only peers which already responded to request for best block hash, and theirs best block hash is different + // form their genesis block sender() ! HandshakedPeers(peersWithInfo.collect { - case (_, PeerWithInfo(peer, peerInfo)) if peerInfo.maxBlockNumber > 0 => peer -> peerInfo + case (_, PeerWithInfo(peer, peerInfo)) if peerHasUdpatedBestBlock(peerInfo) => peer -> peerInfo }) case PeerInfoRequest(peerId) => @@ -112,7 +118,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe * @return new updated peer info */ private def handleSentMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = - updateMaxBlock(message)(initialPeerWithInfo.peerInfo) + initialPeerWithInfo.peerInfo /** * Processes the message and the old peer info and returns the peer info @@ -121,11 +127,12 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe * @param initialPeerWithInfo from before the message was processed * @return new updated peer info */ - private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = + private def handleReceivedMessage(message: Message, initialPeerWithInfo: PeerWithInfo): PeerInfo = { (updateTotalDifficulty(message) _ andThen updateForkAccepted(message, initialPeerWithInfo.peer) andThen updateMaxBlock(message) )(initialPeerWithInfo.peerInfo) + } /** @@ -178,24 +185,28 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe * @return new peer info with the max block number updated */ private def updateMaxBlock(message: Message)(initialPeerInfo: PeerInfo): PeerInfo = { - def update(ns: Seq[BigInt]): PeerInfo = { - val maxBlockNumber = ns.fold(0: BigInt) { case (a, b) => if (a > b) a else b } - if (maxBlockNumber > appStateStorage.getEstimatedHighestBlock()) - appStateStorage.putEstimatedHighestBlock(maxBlockNumber) - - if (maxBlockNumber > initialPeerInfo.maxBlockNumber) - initialPeerInfo.withMaxBlockNumber(maxBlockNumber) - else + def update(ns: Seq[(BigInt, ByteString)]): PeerInfo = { + if (ns.isEmpty) { initialPeerInfo + } else { + val (maxBlockNumber, maxBlockHash) = ns.maxBy(_._1) + if (maxBlockNumber > appStateStorage.getEstimatedHighestBlock()) + appStateStorage.putEstimatedHighestBlock(maxBlockNumber) + + if (maxBlockNumber > initialPeerInfo.maxBlockNumber) + initialPeerInfo.withMaxBlockNumber(maxBlockNumber).withMaxBlockHash(maxBlockHash) + else + initialPeerInfo + } } message match { case m: BlockHeaders => - update(m.headers.map(_.number)) + update(m.headers.map(header => (header.number, header.hash))) case m: NewBlock => - update(Seq(m.block.header.number)) + update(Seq((m.block.header.number, m.block.header.hash))) case m: NewBlockHashes => - update(m.hashes.map(_.number)) + update(m.hashes.map(h => (h.number, h.hash))) case _ => initialPeerInfo } } @@ -209,7 +220,8 @@ object EtcPeerManagerActor { case class PeerInfo(remoteStatus: Status, totalDifficulty: BigInt, forkAccepted: Boolean, - maxBlockNumber: BigInt) extends HandshakeResult { + maxBlockNumber: BigInt, + bestBlockHash: ByteString) extends HandshakeResult { def withTotalDifficulty(totalDifficulty: BigInt): PeerInfo = copy(totalDifficulty = totalDifficulty) @@ -217,6 +229,7 @@ object EtcPeerManagerActor { def withMaxBlockNumber(maxBlockNumber: BigInt): PeerInfo = copy(maxBlockNumber = maxBlockNumber) + def withMaxBlockHash(bestBlockHash: ByteString): PeerInfo = copy(bestBlockHash = bestBlockHash) } private case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo) diff --git a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala index 9da1c72ee6..8ce79078a8 100644 --- a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala +++ b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala @@ -35,7 +35,7 @@ case class EtcForkBlockExchangeState(handshakerConfiguration: EtcHandshakerConfi if (forkResolver.isAccepted(fork)) { log.debug("Fork is accepted") //setting maxBlockNumber to 0, as we do not know best block number yet - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0)) + ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0, remoteStatus.bestHash)) } else { log.debug("Fork is not accepted") DisconnectedState[PeerInfo](Disconnect.Reasons.UselessPeer) @@ -43,7 +43,7 @@ case class EtcForkBlockExchangeState(handshakerConfiguration: EtcHandshakerConfi case None => log.debug("Peer did not respond with fork block header") - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, false, 0)) + ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, false, 0, remoteStatus.bestHash)) } } diff --git a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala index c226511e16..1d176849ab 100644 --- a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala +++ b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala @@ -32,7 +32,7 @@ case class EtcNodeStatusExchangeState(handshakerConfiguration: EtcHandshakerConf case Some(forkResolver) => EtcForkBlockExchangeState(handshakerConfiguration, forkResolver, remoteStatus) case None => - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0)) + ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0, remoteStatus.bestHash)) } } else DisconnectedState(Reasons.DisconnectRequested) diff --git a/src/test/scala/io/iohk/ethereum/Mocks.scala b/src/test/scala/io/iohk/ethereum/Mocks.scala index bb2de6bdd3..7dc0520381 100644 --- a/src/test/scala/io/iohk/ethereum/Mocks.scala +++ b/src/test/scala/io/iohk/ethereum/Mocks.scala @@ -112,7 +112,7 @@ object Mocks { case class MockHandshakerAlwaysSucceeds(initialStatus: Status, currentMaxBlockNumber: BigInt, forkAccepted: Boolean) extends Handshaker[PeerInfo] { override val handshakerState: HandshakerState[PeerInfo] = - ConnectedState(PeerInfo(initialStatus, initialStatus.totalDifficulty, forkAccepted, currentMaxBlockNumber)) + ConnectedState(PeerInfo(initialStatus, initialStatus.totalDifficulty, forkAccepted, currentMaxBlockNumber, initialStatus.bestHash)) override def copy(handshakerState: HandshakerState[PeerInfo]): Handshaker[PeerInfo] = this } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcastSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcastSpec.scala index bac32f6f89..6f49e5d370 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcastSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockBroadcastSpec.scala @@ -141,7 +141,8 @@ class BlockBroadcastSpec extends FlatSpec with Matchers { remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = false, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) val peerProbe = TestProbe() diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala index 7a953bc422..904d5ef91e 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala @@ -613,18 +613,18 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with val peer4Status= Status(1, 1, 20, ByteString("peer4_bestHash"), ByteString("unused")) val allPeers = Map( - peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0), - peer2 -> PeerInfo(peer2Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0), - peer3 -> PeerInfo(peer3Status, forkAccepted = false, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0), - peer4 -> PeerInfo(peer4Status, forkAccepted = false, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0) + peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer1Status.bestHash), + peer2 -> PeerInfo(peer2Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer2Status.bestHash), + peer3 -> PeerInfo(peer3Status, forkAccepted = false, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer3Status.bestHash), + peer4 -> PeerInfo(peer4Status, forkAccepted = false, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer4Status.bestHash) ) val twoAcceptedPeers = Map( - peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0), - peer2 -> PeerInfo(peer2Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0) + peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer1Status.bestHash), + peer2 -> PeerInfo(peer2Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer2Status.bestHash) ) - val singlePeer = Map(peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0)) + val singlePeer = Map(peer1 -> PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer1Status.bestHash)) def sendNewTargetBlock(targetBlockHeader: BlockHeader, peer: Peer, diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSyncSpec.scala index ad356240b6..bf07f26a03 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/OldRegularSyncSpec.scala @@ -597,7 +597,7 @@ class OldRegularSyncSpec extends WordSpec with Matchers with MockFactory with Ev val peer1 = Peer(new InetSocketAddress("127.0.0.1", 0), TestProbe().ref, incomingConnection = false) val peer1Status = Status(1, 1, 1, ByteString("peer1_bestHash"), ByteString("unused")) - val peer1Info = PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0) + val peer1Info = PeerInfo(peer1Status, forkAccepted = true, totalDifficulty = peer1Status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = peer1Status.bestHash) val peer1Id: PeerId = peer1.id val handshakedPeers = Map(peer1 -> peer1Info) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala index 808844e537..e00cf0e5a7 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncFixtures.scala @@ -140,7 +140,7 @@ trait RegularSyncFixtures { self: Matchers with MockFactory => def getPeerInfo(peer: Peer): PeerInfo = { val status = Status(1, 1, 1, ByteString(s"${peer.id}_bestHash"), ByteString("unused")) - PeerInfo(status, forkAccepted = true, totalDifficulty = status.totalDifficulty, maxBlockNumber = 0) + PeerInfo(status, forkAccepted = true, totalDifficulty = status.totalDifficulty, maxBlockNumber = 0, bestBlockHash = status.bestHash) } def peerByNumber(number: Int): Peer = handshakedPeers.keys.toList.sortBy(_.id.value).apply(number) diff --git a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala index 7fbeff14bd..d5ae8d7fa3 100644 --- a/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala +++ b/src/test/scala/io/iohk/ethereum/jsonrpc/DebugServiceSpec.scala @@ -72,7 +72,8 @@ class DebugServiceSpec extends FlatSpec with Matchers with MockFactory with Scal remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = false, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) val peer1Probe = TestProbe() val peer1 = Peer(new InetSocketAddress("127.0.0.1", 1), peer1Probe.ref, incomingConnection = false) diff --git a/src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala b/src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala index 3dd07a56aa..02ae6d8dcb 100644 --- a/src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/jsonrpc/JsonRpcControllerSpec.scala @@ -508,7 +508,8 @@ class JsonRpcControllerSpec extends FlatSpec with Matchers with PropertyChecks w remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = true, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) val peers = List(initialPeerInfo) diff --git a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala index 7014a9a359..a34adb740a 100644 --- a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala @@ -63,6 +63,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { val expectedPeerInfo = initialPeerInfo .withMaxBlockNumber(initialPeerInfo.maxBlockNumber + 4) .withTotalDifficulty(newBlockTD) + .withMaxBlockHash(firstHeader.hash) requestSender.expectMsg(PeerInfoResponse(Some(expectedPeerInfo))) } @@ -79,7 +80,9 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { //then requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(initialPeerInfo.maxBlockNumber + 4)))) + requestSender.expectMsg(PeerInfoResponse( + Some(peer1Info.withMaxBlockNumber(initialPeerInfo.maxBlockNumber + 4).withMaxBlockHash(firstHeader.hash))) + ) } it should "update max peer when receiving new block hashes" in new TestSetup { @@ -88,80 +91,14 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { //given val firstBlockHash: BlockHash = BlockHash(ByteString(Hex.decode("00" * 32)), peer1Info.maxBlockNumber + 2) - val secondBlockHash: BlockHash = BlockHash(ByteString(Hex.decode("00" * 32)), peer1Info.maxBlockNumber + 5) + val secondBlockHash: BlockHash = BlockHash(ByteString(Hex.decode("01" * 32)), peer1Info.maxBlockNumber + 5) //when peersInfoHolder ! MessageFromPeer(NewBlockHashes(Seq(firstBlockHash, secondBlockHash)), peer1.id) //then requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 5)))) - } - - it should "update max peer when sending new block" in new TestSetup { - peerEventBus.expectMsg(Subscribe(PeerHandshaked)) - setupNewPeer(peer1, peer1Probe, peer1Info) - - //given - val firstHeader: BlockHeader = baseBlockHeader.copy(number = peer1Info.maxBlockNumber + 4) - val firstBlock = NewBlock(Block(firstHeader, BlockBody(Nil, Nil)), 300) - - val secondHeader: BlockHeader = baseBlockHeader.copy(number = peer1Info.maxBlockNumber + 2) - val secondBlock = NewBlock(Block(secondHeader, BlockBody(Nil, Nil)), 300) - - //when - peersInfoHolder ! EtcPeerManagerActor.SendMessage(firstBlock, peer1.id) - peersInfoHolder ! EtcPeerManagerActor.SendMessage(secondBlock, peer1.id) - - //then - requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 4)))) - peerManager.expectMsgAllOf( - PeerManagerActor.SendMessage(firstBlock, peer1.id), - PeerManagerActor.SendMessage(secondBlock, peer1.id) - ) - } - - it should "update max peer when sending block header" in new TestSetup { - peerEventBus.expectMsg(Subscribe(PeerHandshaked)) - setupNewPeer(peer1, peer1Probe, peer1Info) - - //given - val firstHeader: BlockHeader = baseBlockHeader.copy(number = peer1Info.maxBlockNumber + 4) - val secondHeader: BlockHeader = baseBlockHeader.copy(number = peer1Info.maxBlockNumber + 2) - - //when - peersInfoHolder ! EtcPeerManagerActor.SendMessage(BlockHeaders(Seq(firstHeader)), peer1.id) - peersInfoHolder ! EtcPeerManagerActor.SendMessage(BlockHeaders(Seq(secondHeader)), peer1.id) - - //then - requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 4)))) - peerManager.expectMsgAllOf( - PeerManagerActor.SendMessage(BlockHeaders(Seq(firstHeader)), peer1.id), - PeerManagerActor.SendMessage(BlockHeaders(Seq(secondHeader)), peer1.id) - ) - } - - it should "update max peer when sending new block hashes" in new TestSetup { - peerEventBus.expectMsg(Subscribe(PeerHandshaked)) - setupNewPeer(peer1, peer1Probe, peer1Info) - - //given - val firstBlockHash: BlockHash = BlockHash(ByteString(Hex.decode("00" * 32)), peer1Info.maxBlockNumber + 2) - val secondBlockHash: BlockHash = BlockHash(ByteString(Hex.decode("00" * 32)), peer1Info.maxBlockNumber + 5) - - //when - peersInfoHolder ! EtcPeerManagerActor.SendMessage(NewBlockHashes(Seq(firstBlockHash)), peer1.id) - peersInfoHolder ! EtcPeerManagerActor.SendMessage(NewBlockHashes(Seq(secondBlockHash)), peer1.id) - - //then - requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 5)))) - peerManager.expectMsgAllOf( - PeerManagerActor.SendMessage(NewBlockHashes(Seq(firstBlockHash)), peer1.id), - PeerManagerActor.SendMessage(NewBlockHashes(Seq(secondBlockHash)), peer1.id) - ) + requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 5).withMaxBlockHash(secondBlockHash.hash)))) } it should "update the peer total difficulty when receiving a NewBlock" in new TestSetup { @@ -257,11 +194,35 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { val firstHeader: BlockHeader = baseBlockHeader.copy(number = newMaxBlock) // Fresh peer received best block - peersInfoHolder ! EtcPeerManagerActor.SendMessage(BlockHeaders(Seq(firstHeader)), freshPeer.id) + peersInfoHolder ! MessageFromPeer(BlockHeaders(Seq(firstHeader)), freshPeer.id) // After receiving peer best block number, peer should be provided as handshaked peer requestSender.send(peersInfoHolder, GetHandshakedPeers) - requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> freshPeerInfo.withMaxBlockNumber(newMaxBlock)))) + requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> freshPeerInfo.withMaxBlockNumber(newMaxBlock).withMaxBlockHash(firstHeader.hash)))) + } + + it should "provide handshaked peers only with best block number determined even if peers best block is its genesis" in new TestSetup { + peerEventBus.expectMsg(Subscribe(PeerHandshaked)) + + val genesisStatus = peerStatus.copy(bestHash = Fixtures.Blocks.Genesis.header.hash) + val genesisInfo = initialPeerInfo.copy( + remoteStatus = genesisStatus, + maxBlockNumber = Fixtures.Blocks.Genesis.header.number, + bestBlockHash = Fixtures.Blocks.Genesis.header.hash) + + // Freshly handshaked peer without best block determined + setupNewPeer(freshPeer, freshPeerProbe, genesisInfo) + + // if peer best block is its genesis block then it is available right from the start + requestSender.send(peersInfoHolder, GetHandshakedPeers) + requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> genesisInfo))) + + // Fresh peer received best block + peersInfoHolder ! MessageFromPeer(BlockHeaders(Seq(Fixtures.Blocks.Genesis.header)), freshPeer.id) + + // receiving best block does not change a thing, as peer best block is it genesis + requestSender.send(peersInfoHolder, GetHandshakedPeers) + requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> genesisInfo))) } trait TestSetup extends EphemBlockchainTestSetup { @@ -283,7 +244,8 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = false, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) val peer1Probe = TestProbe() diff --git a/src/test/scala/io/iohk/ethereum/network/PeerActorHandshakingSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerActorHandshakingSpec.scala index 8b39ffe7fe..2f525ab90d 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerActorHandshakingSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerActorHandshakingSpec.scala @@ -169,7 +169,7 @@ class PeerActorHandshakingSpec extends FlatSpec with Matchers { val defaultBlockNumber = 1000 val defaultForkAccepted = true - val defaultPeerInfo = PeerInfo(defaultStatus, defaultStatus.totalDifficulty, defaultForkAccepted, defaultBlockNumber) + val defaultPeerInfo = PeerInfo(defaultStatus, defaultStatus.totalDifficulty, defaultForkAccepted, defaultBlockNumber, defaultStatus.bestHash) val defaultReasonDisconnect = Disconnect.Reasons.Other diff --git a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala index 86c7cd2ae2..45f2deb110 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerEventBusActorSpec.scala @@ -205,7 +205,8 @@ class PeerEventBusActorSpec extends FlatSpec with Matchers { remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = false, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) } diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index 3235e0793b..d1edb11d01 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -209,7 +209,8 @@ class PeerManagerSpec extends FlatSpec with Matchers with Eventually with Normal remoteStatus = peerStatus, totalDifficulty = peerStatus.totalDifficulty, forkAccepted = false, - maxBlockNumber = Fixtures.Blocks.Block3125369.header.number + maxBlockNumber = Fixtures.Blocks.Block3125369.header.number, + bestBlockHash = peerStatus.bestHash ) val peerManager: TestActorRef[PeerManagerActor] = TestActorRef[PeerManagerActor](Props(new PeerManagerActor(peerEventBus.ref, diff --git a/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala b/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala index 2d14512d7f..937dde53c7 100644 --- a/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/handshaker/EtcHandshakerSpec.scala @@ -36,9 +36,10 @@ class EtcHandshakerSpec extends FlatSpec with Matchers { assert(handshakerAfterStatusOpt.isDefined) handshakerAfterStatusOpt.get.nextMessage match { - case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber))) => + case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber, bestBlockHash))) => initialStatus shouldBe remoteStatus totalDifficulty shouldBe remoteStatus.totalDifficulty + bestBlockHash shouldBe remoteStatus.bestHash currentMaxBlockNumber shouldBe 0 forkAccepted shouldBe true case _ => fail @@ -72,9 +73,10 @@ class EtcHandshakerSpec extends FlatSpec with Matchers { assert(handshakerAfterForkOpt.isDefined) handshakerAfterForkOpt.get.nextMessage match { - case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber))) => + case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber, bestBlockHash))) => initialStatus shouldBe remoteStatus totalDifficulty shouldBe remoteStatus.totalDifficulty + bestBlockHash shouldBe remoteStatus.bestHash currentMaxBlockNumber shouldBe 0 forkAccepted shouldBe true case _ => fail @@ -92,9 +94,10 @@ class EtcHandshakerSpec extends FlatSpec with Matchers { assert(handshakerAfterStatusOpt.isDefined) handshakerAfterFork.get.nextMessage match { - case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber))) => + case Left(HandshakeSuccess(PeerInfo(initialStatus, totalDifficulty, forkAccepted, currentMaxBlockNumber, bestBlockHash))) => initialStatus shouldBe remoteStatus totalDifficulty shouldBe remoteStatus.totalDifficulty + bestBlockHash shouldBe remoteStatus.bestHash currentMaxBlockNumber shouldBe 0 forkAccepted shouldBe false case _ => fail From fae8700bcbdcaa09243a7b64ab8cb5de78b1533d Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 16 Sep 2020 15:48:59 +0200 Subject: [PATCH 4/9] [ETCM-102] Pr comments --- .../io/iohk/ethereum/sync/FastSyncItSpec.scala | 2 +- .../iohk/ethereum/network/EtcPeerManagerActor.scala | 11 +++++------ .../iohk/ethereum/network/EtcPeerManagerSpec.scala | 13 ++++++------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index 743a9aea56..dd967ecdc3 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -433,7 +433,7 @@ object FastSyncItSpec { object FakePeer { def startFakePeer(peerName: String): Task[FakePeer] = { for { - peer <- Task(new FakePeer(peerName)).memoizeOnSuccess + peer <- Task(new FakePeer(peerName)) _ <- peer.startPeer() } yield peer } diff --git a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala index 80794461e9..2626bd799d 100644 --- a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala @@ -193,9 +193,9 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe if (maxBlockNumber > appStateStorage.getEstimatedHighestBlock()) appStateStorage.putEstimatedHighestBlock(maxBlockNumber) - if (maxBlockNumber > initialPeerInfo.maxBlockNumber) - initialPeerInfo.withMaxBlockNumber(maxBlockNumber).withMaxBlockHash(maxBlockHash) - else + if (maxBlockNumber > initialPeerInfo.maxBlockNumber) { + initialPeerInfo.withBestBlockData(maxBlockNumber, maxBlockHash) + } else initialPeerInfo } } @@ -227,9 +227,8 @@ object EtcPeerManagerActor { def withForkAccepted(forkAccepted: Boolean): PeerInfo = copy(forkAccepted = forkAccepted) - def withMaxBlockNumber(maxBlockNumber: BigInt): PeerInfo = copy(maxBlockNumber = maxBlockNumber) - - def withMaxBlockHash(bestBlockHash: ByteString): PeerInfo = copy(bestBlockHash = bestBlockHash) + def withBestBlockData(maxBlockNumber: BigInt, bestBlockHash: ByteString): PeerInfo = + copy(maxBlockNumber = maxBlockNumber, bestBlockHash = bestBlockHash) } private case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo) diff --git a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala index 80c78d10bf..249fcc2bb4 100644 --- a/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/EtcPeerManagerSpec.scala @@ -61,9 +61,8 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { //then requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) val expectedPeerInfo = initialPeerInfo - .withMaxBlockNumber(initialPeerInfo.maxBlockNumber + 4) + .withBestBlockData(initialPeerInfo.maxBlockNumber + 4, firstHeader.hash) .withTotalDifficulty(newBlockTD) - .withMaxBlockHash(firstHeader.hash) requestSender.expectMsg(PeerInfoResponse(Some(expectedPeerInfo))) } @@ -81,7 +80,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { //then requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) requestSender.expectMsg(PeerInfoResponse( - Some(peer1Info.withMaxBlockNumber(initialPeerInfo.maxBlockNumber + 4).withMaxBlockHash(firstHeader.hash))) + Some(peer1Info.withBestBlockData(initialPeerInfo.maxBlockNumber + 4, firstHeader.hash))) ) } @@ -98,7 +97,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { //then requestSender.send(peersInfoHolder, PeerInfoRequest(peer1.id)) - requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withMaxBlockNumber(peer1Info.maxBlockNumber + 5).withMaxBlockHash(secondBlockHash.hash)))) + requestSender.expectMsg(PeerInfoResponse(Some(peer1Info.withBestBlockData(peer1Info.maxBlockNumber + 5, secondBlockHash.hash)))) } it should "update the peer total difficulty when receiving a NewBlock" in new TestSetup { @@ -185,7 +184,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { it should "provide handshaked peers only with best block number determined" in new TestSetup { peerEventBus.expectMsg(Subscribe(PeerHandshaked)) // Freshly handshaked peer without best block determined - setupNewPeer(freshPeer, freshPeerProbe, freshPeerInfo) + setupNewPeer(freshPeer, freshPeerProbe, freshPeerInfo.copy(maxBlockNumber = 0)) requestSender.send(peersInfoHolder, GetHandshakedPeers) requestSender.expectMsg(HandshakedPeers(Map.empty)) @@ -198,7 +197,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { // After receiving peer best block number, peer should be provided as handshaked peer requestSender.send(peersInfoHolder, GetHandshakedPeers) - requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> freshPeerInfo.withMaxBlockNumber(newMaxBlock).withMaxBlockHash(firstHeader.hash)))) + requestSender.expectMsg(HandshakedPeers(Map(freshPeer -> freshPeerInfo.withBestBlockData(newMaxBlock, firstHeader.hash)))) } it should "provide handshaked peers only with best block number determined even if peers best block is its genesis" in new TestSetup { @@ -259,7 +258,7 @@ class EtcPeerManagerSpec extends FlatSpec with Matchers { val freshPeerProbe = TestProbe() val freshPeer = Peer(new InetSocketAddress("127.0.0.1", 4), freshPeerProbe.ref, false) - val freshPeerInfo = initialPeerInfo.withForkAccepted(false).withMaxBlockNumber(0) + val freshPeerInfo = initialPeerInfo.withForkAccepted(false) val peerManager = TestProbe() val peerEventBus = TestProbe() From 6ba4c8c7054ea9c30875fcf10f31d1c4d2a6a611 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 17 Sep 2020 08:49:37 +0200 Subject: [PATCH 5/9] [ETCM-102] Custom constructors for PeerInfo. Rename things in tests --- .../scala/io/iohk/ethereum/sync/FastSyncItSpec.scala | 12 +++++++----- .../iohk/ethereum/network/EtcPeerManagerActor.scala | 10 ++++++++++ .../handshaker/EtcForkBlockExchangeState.scala | 4 ++-- .../handshaker/EtcNodeStatusExchangeState.scala | 2 +- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index dd967ecdc3..6729f58d46 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -71,6 +71,8 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { _ <- peer1.waitForFastSyncFinish() } yield { val trie = peer1.getBestBlockTrie() + // due to the fact that function generating state is deterministic both peer2 and peer3 ends up with exactly same + // state, so peer1 can get whole trie from both of them. assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) assert(peer1.bl.getBestBlockNumber() == peer3.bl.getBestBlockNumber() - peer3.testSyncConfig.targetBlockOffset) assert(trie.isDefined) @@ -83,7 +85,7 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { for { _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate) _ <- peer1.connectToPeers(Set(peer2.node)) - _ <- peer2.syncUntil(2000)(IdentityUpdate).startAndForget + _ <- peer2.importBlocksUntil(2000)(IdentityUpdate).startAndForget _ <- peer1.startFastSync() _ <- peer1.waitForFastSyncFinish() } yield { @@ -127,7 +129,7 @@ object FastSyncItSpec { val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world - def updateWorldWithNRandomAccounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { + def updateWorldWithNAccounts(n:Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { val resultWorld = (0 until n).foldLeft(world) { (world, num) => val randomBalance = num val randomAddress = Address(num) @@ -144,7 +146,7 @@ object FastSyncItSpec { def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) => if (blockNr == blockWithUpdate) { - updateWorldWithNRandomAccounts(1000, world) + updateWorldWithNAccounts(1000, world) } else { IdentityUpdate(blockNr, world) } @@ -393,7 +395,7 @@ object FastSyncItSpec { go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n) } - def syncUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { + def importBlocksUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { Task(bl.getBestBlock()).flatMap { block => if (block.number >= n) { Task(()) @@ -405,7 +407,7 @@ object FastSyncItSpec { bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) bl.persistCachedNodes() broadcastBlock(newBlock, newTd) - }.flatMap(_ => syncUntil(n)(updateWorldForBlock)) + }.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock)) } } } diff --git a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala index 2626bd799d..f13dfeb452 100644 --- a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala @@ -231,6 +231,16 @@ object EtcPeerManagerActor { copy(maxBlockNumber = maxBlockNumber, bestBlockHash = bestBlockHash) } + object PeerInfo { + def apply(remoteStatus: Status, forkAccepted: Boolean): PeerInfo = { + PeerInfo(remoteStatus, remoteStatus.totalDifficulty, forkAccepted, 0, remoteStatus.bestHash) + } + + def withForkAccepted(remoteStatus: Status): PeerInfo = PeerInfo(remoteStatus, forkAccepted = true) + + def withNotForkAccepted(remoteStatus: Status): PeerInfo = PeerInfo(remoteStatus, forkAccepted = false) + } + private case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo) case object GetHandshakedPeers diff --git a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala index 8ce79078a8..a80009ea9c 100644 --- a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala +++ b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcForkBlockExchangeState.scala @@ -35,7 +35,7 @@ case class EtcForkBlockExchangeState(handshakerConfiguration: EtcHandshakerConfi if (forkResolver.isAccepted(fork)) { log.debug("Fork is accepted") //setting maxBlockNumber to 0, as we do not know best block number yet - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0, remoteStatus.bestHash)) + ConnectedState(PeerInfo.withForkAccepted(remoteStatus)) } else { log.debug("Fork is not accepted") DisconnectedState[PeerInfo](Disconnect.Reasons.UselessPeer) @@ -43,7 +43,7 @@ case class EtcForkBlockExchangeState(handshakerConfiguration: EtcHandshakerConfi case None => log.debug("Peer did not respond with fork block header") - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, false, 0, remoteStatus.bestHash)) + ConnectedState(PeerInfo.withNotForkAccepted(remoteStatus)) } } diff --git a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala index 1d176849ab..8ed7c0ad36 100644 --- a/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala +++ b/src/main/scala/io/iohk/ethereum/network/handshaker/EtcNodeStatusExchangeState.scala @@ -32,7 +32,7 @@ case class EtcNodeStatusExchangeState(handshakerConfiguration: EtcHandshakerConf case Some(forkResolver) => EtcForkBlockExchangeState(handshakerConfiguration, forkResolver, remoteStatus) case None => - ConnectedState(PeerInfo(remoteStatus, remoteStatus.totalDifficulty, true, 0, remoteStatus.bestHash)) + ConnectedState(PeerInfo.withForkAccepted(remoteStatus)) } } else DisconnectedState(Reasons.DisconnectRequested) From e2044c1745e23ad175bfa3795b650b0be2f4cf71 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Fri, 18 Sep 2020 07:39:40 +0200 Subject: [PATCH 6/9] [ETCM-102] Fix typo. Clear up it tests --- .../iohk/ethereum/sync/FastSyncItSpec.scala | 50 ++++++------------- .../network/EtcPeerManagerActor.scala | 4 +- 2 files changed, 16 insertions(+), 38 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index 6729f58d46..b5b35a1d41 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -15,9 +15,9 @@ import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastB import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} import io.iohk.ethereum.db.components.{SharedRocksDbDataSources, Storages} import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} -import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} -import io.iohk.ethereum.domain.{Account, Address, Block, Blockchain, BlockchainImpl} +import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} +import io.iohk.ethereum.domain._ import io.iohk.ethereum.ledger.InMemoryWorldStateProxy import io.iohk.ethereum.mpt.{HashNode, MerklePatriciaTrie, MptNode, MptTraversals} import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo @@ -31,29 +31,28 @@ import io.iohk.ethereum.network.rlpx.AuthHandshaker import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, ServerActor} import io.iohk.ethereum.nodebuilder.{PruningConfigBuilder, SecureRandomBuilder} -import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, customTestCaseResourceM, updateStateAtBlock} +import io.iohk.ethereum.sync.FastSyncItSpec.{FakePeer, IdentityUpdate, updateStateAtBlock} import io.iohk.ethereum.utils.ServerStatus.Listening import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus, VmConfig} import io.iohk.ethereum.vm.EvmConfig -import io.iohk.ethereum.{Fixtures, Timeouts} +import io.iohk.ethereum.{Fixtures, FlatSpecBase, Timeouts} import monix.eval.Task import monix.execution.Scheduler -import org.scalatest.{Assertion, AsyncFlatSpec, BeforeAndAfter, Matchers} +import org.scalatest.{BeforeAndAfter, Matchers} -import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try -class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { +class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter { implicit val testScheduler = Scheduler.fixedPool("test", 16) "FastSync" should "should sync blockchain without state nodes" in customTestCaseResourceM(FakePeer.start3FakePeersRes()) { case (peer1, peer2, peer3) => for { - _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate) - _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000)(IdentityUpdate) + _ <- peer2.importBlocksUntil(1000)(IdentityUpdate) + _ <- peer3.importBlocksUntil(1000)(IdentityUpdate) _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) - _ <- peer1.startFastSync() + _ <- peer1.startFastSync().delayExecution(50.milliseconds) _ <- peer1.waitForFastSyncFinish() } yield { assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) @@ -64,10 +63,10 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { it should "should sync blockchain with state nodes" in customTestCaseResourceM(FakePeer.start3FakePeersRes()) { case (peer1, peer2, peer3) => for { - _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(updateStateAtBlock(500)) - _ <- peer3.importNBlocksToTheTopForm(peer3.getCurrentState(), 1000)(updateStateAtBlock(500)) + _ <- peer2.importBlocksUntil(1000)(updateStateAtBlock(500)) + _ <- peer3.importBlocksUntil(1000)(updateStateAtBlock(500)) _ <- peer1.connectToPeers(Set(peer2.node, peer3.node)) - _ <- peer1.startFastSync() + _ <- peer1.startFastSync().delayExecution(50.milliseconds) _ <- peer1.waitForFastSyncFinish() } yield { val trie = peer1.getBestBlockTrie() @@ -83,10 +82,10 @@ class FastSyncItSpec extends AsyncFlatSpec with Matchers with BeforeAndAfter { it should "should update target block" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) { case (peer1, peer2) => for { - _ <- peer2.importNBlocksToTheTopForm(peer2.getCurrentState(), 1000)(IdentityUpdate) + _ <- peer2.importBlocksUntil(1000)(IdentityUpdate) _ <- peer1.connectToPeers(Set(peer2.node)) _ <- peer2.importBlocksUntil(2000)(IdentityUpdate).startAndForget - _ <- peer1.startFastSync() + _ <- peer1.startFastSync().delayExecution(50.milliseconds) _ <- peer1.waitForFastSyncFinish() } yield { assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.targetBlockOffset) @@ -120,11 +119,6 @@ object FastSyncItSpec { } } - def customTestCaseResourceM[T](fixture: Resource[Task, T]) - (theTest: T => Task[Assertion])(implicit s: Scheduler): Future[Assertion] = { - fixture.use(theTest).runToFuture - } - final case class BlockchainState(bestBlock: Block, currentWorldState: InMemoryWorldStateProxy, currentTd: BigInt) val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world @@ -379,22 +373,6 @@ object FastSyncItSpec { (newBlock, newTd, parentWorld) } - def importNBlocksToTheTopForm(startState: BlockchainState, n: Int) - (updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { - def go(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy, blocksLeft: Int): Task[Unit] = { - if (blocksLeft <= 0) { - Task.now(()) - } else { - val (newBlock, newTd, newWorld) = createChildBlock(parent, parentTd, parentWorld)(updateWorldForBlock) - bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true) - bl.persistCachedNodes() - go(newBlock, newTd, newWorld, blocksLeft - 1) - } - } - - go(startState.bestBlock, startState.currentTd, startState.currentWorldState, n) - } - def importBlocksUntil(n: BigInt)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = { Task(bl.getBestBlock()).flatMap { block => if (block.number >= n) { diff --git a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala index f13dfeb452..80410fac11 100644 --- a/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/EtcPeerManagerActor.scala @@ -38,7 +38,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe def handleMessages(peersWithInfo: PeersWithInfo): Receive = handleCommonMessages(peersWithInfo) orElse handlePeersInfoEvents(peersWithInfo) - private def peerHasUdpatedBestBlock(peerInfo: PeerInfo): Boolean = { + private def peerHasUpdatedBestBlock(peerInfo: PeerInfo): Boolean = { val peerBestBlockIsItsGenesisBlock = peerInfo.bestBlockHash == peerInfo.remoteStatus.genesisHash peerBestBlockIsItsGenesisBlock || (!peerBestBlockIsItsGenesisBlock && peerInfo.maxBlockNumber > 0) } @@ -52,7 +52,7 @@ class EtcPeerManagerActor(peerManagerActor: ActorRef, peerEventBusActor: ActorRe // Provide only peers which already responded to request for best block hash, and theirs best block hash is different // form their genesis block sender() ! HandshakedPeers(peersWithInfo.collect { - case (_, PeerWithInfo(peer, peerInfo)) if peerHasUdpatedBestBlock(peerInfo) => peer -> peerInfo + case (_, PeerWithInfo(peer, peerInfo)) if peerHasUpdatedBestBlock(peerInfo) => peer -> peerInfo }) case PeerInfoRequest(peerId) => From cd44ef16b67c710bb13fc269ed6f0a8f812a33b5 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 21 Sep 2020 11:35:25 +0200 Subject: [PATCH 7/9] [ETCM-102] Fix race condition in frame encoder/decoder --- .../io/iohk/ethereum/network/PeerActor.scala | 63 ++++++----- .../ethereum/network/rlpx/MessageCodec.scala | 32 +++++- .../network/rlpx/RLPxConnectionHandler.scala | 68 ++++++------ .../network/p2p/MessageCodecSpec.scala | 100 ++++++++++++++++++ .../ethereum/network/p2p/PeerActorSpec.scala | 17 +-- .../rlpx/RLPxConnectionHandlerSpec.scala | 20 ++-- 6 files changed, 204 insertions(+), 96 deletions(-) create mode 100644 src/test/scala/io/iohk/ethereum/network/p2p/MessageCodecSpec.scala diff --git a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala index 4f15d72b56..bd512b8b84 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerActor.scala @@ -1,22 +1,22 @@ package io.iohk.ethereum.network -import java.net.{ InetSocketAddress, URI } +import java.net.{InetSocketAddress, URI} import akka.actor.SupervisorStrategy.Escalate import akka.actor._ import akka.util.ByteString import io.iohk.ethereum.network.PeerActor.Status._ -import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{ MessageFromPeer, PeerHandshakeSuccessful } +import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.{MessageFromPeer, PeerHandshakeSuccessful} import io.iohk.ethereum.network.PeerEventBusActor.Publish import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration import io.iohk.ethereum.network.handshaker.Handshaker -import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeComplete.{ HandshakeFailure, HandshakeSuccess } -import io.iohk.ethereum.network.handshaker.Handshaker.{ HandshakeResult, NextMessage } +import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeComplete.{HandshakeFailure, HandshakeSuccess} +import io.iohk.ethereum.network.handshaker.Handshaker.{HandshakeResult, NextMessage} import io.iohk.ethereum.network.p2p._ import io.iohk.ethereum.network.p2p.messages.Versions import io.iohk.ethereum.network.p2p.messages.WireProtocol._ import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration -import io.iohk.ethereum.network.rlpx.{ AuthHandshaker, RLPxConnectionHandler } +import io.iohk.ethereum.network.rlpx.{AuthHandshaker, RLPxConnectionHandler} import org.bouncycastle.util.encoders.Hex @@ -28,18 +28,18 @@ import org.bouncycastle.util.encoders.Hex * Once that's done it can send/receive messages with peer (HandshakedHandler.receive). */ class PeerActor[R <: HandshakeResult]( - peerAddress: InetSocketAddress, - rlpxConnectionFactory: ActorContext => ActorRef, - val peerConfiguration: PeerConfiguration, - peerEventBus: ActorRef, - knownNodesManager: ActorRef, - incomingConnection: Boolean, - externalSchedulerOpt: Option[Scheduler] = None, - initHandshaker: Handshaker[R]) + peerAddress: InetSocketAddress, + rlpxConnectionFactory: ActorContext => ActorRef, + val peerConfiguration: PeerConfiguration, + peerEventBus: ActorRef, + knownNodesManager: ActorRef, + incomingConnection: Boolean, + externalSchedulerOpt: Option[Scheduler] = None, + initHandshaker: Handshaker[R]) extends Actor with ActorLogging with Stash { import PeerActor._ - import context.{ dispatcher, system } + import context.{dispatcher, system} override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() { @@ -73,6 +73,7 @@ class PeerActor[R <: HandshakeResult]( context watch ref RLPxConnection(ref, remoteAddress, uriOpt) } + private def modifyOutGoingUri(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, uri: URI): URI = { val host = getHostName(rlpxConnection.remoteAddress.getAddress) val port = rlpxConnection.remoteAddress.getPort @@ -106,23 +107,14 @@ class PeerActor[R <: HandshakeResult]( def processingHandshaking(handshaker: Handshaker[R], rlpxConnection: RLPxConnection, timeout: Cancellable, numRetries: Int): Receive = - handleTerminated(rlpxConnection, numRetries, Handshaking(numRetries)) orElse + handleTerminated(rlpxConnection, numRetries, Handshaking(numRetries)) orElse handleDisconnectMsg(rlpxConnection, Handshaking(numRetries)) orElse handlePingMsg(rlpxConnection) orElse stashMessages orElse { case RLPxConnectionHandler.MessageReceived(msg) => - - // We need to determine p2p version just after hello message as next messages in handshake - // can be compressed. - msg match { - case Hello(p2pVersion, _, _, _, _) => - rlpxConnection.ref ! PeerP2pVersion(p2pVersion) - case _ => () - } - // Processes the received message, cancels the timeout and processes a new message but only if the handshaker // handles the received message - handshaker.applyMessage(msg).foreach{ newHandshaker => + handshaker.applyMessage(msg).foreach { newHandshaker => timeout.cancel() processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries) } @@ -143,7 +135,7 @@ class PeerActor[R <: HandshakeResult]( * * @param handshaker * @param rlpxConnection - * @param numRetries, number of connection retries done during RLPxConnection establishment + * @param numRetries , number of connection retries done during RLPxConnection establishment */ private def processHandshakerNextMessage(handshaker: Handshaker[R], rlpxConnection: RLPxConnection, @@ -155,7 +147,7 @@ class PeerActor[R <: HandshakeResult]( context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries) case Left(HandshakeSuccess(handshakeResult)) => - rlpxConnection.uriOpt.foreach { uri =>knownNodesManager ! KnownNodesManager.AddKnownNode(uri) } + rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) } context become new HandshakedPeer(rlpxConnection, handshakeResult).receive unstashAll() @@ -252,8 +244,8 @@ class PeerActor[R <: HandshakeResult]( */ def receive: Receive = handlePingMsg(rlpxConnection) orElse - handleDisconnectMsg(rlpxConnection, Handshaked) orElse - handleTerminated(rlpxConnection, 0, Handshaked) orElse { + handleDisconnectMsg(rlpxConnection, Handshaked) orElse + handleTerminated(rlpxConnection, 0, Handshaked) orElse { case RLPxConnectionHandler.MessageReceived(message) => log.debug(s"Received message: {} from $peerId", message) @@ -268,7 +260,7 @@ class PeerActor[R <: HandshakeResult]( case GetStatus => sender() ! StatusResponse(Handshaked) - } + } } } @@ -304,8 +296,6 @@ object PeerActor { } } - case class PeerP2pVersion(p2pVersion: Long) - case class HandleConnection(connection: ActorRef, remoteAddress: InetSocketAddress) case class IncomingConnectionHandshakeSuccess(peer: Peer) @@ -321,16 +311,25 @@ object PeerActor { private case object ResponseTimeout case object GetStatus + case class StatusResponse(status: Status) case class DisconnectPeer(reason: Int) sealed trait Status + object Status { + case object Idle extends Status + case object Connecting extends Status + case class Handshaking(numRetries: Int) extends Status + case object Handshaked extends Status + case object Disconnected extends Status + } + } diff --git a/src/main/scala/io/iohk/ethereum/network/rlpx/MessageCodec.scala b/src/main/scala/io/iohk/ethereum/network/rlpx/MessageCodec.scala index 1aafc12f67..fd395417e4 100644 --- a/src/main/scala/io/iohk/ethereum/network/rlpx/MessageCodec.scala +++ b/src/main/scala/io/iohk/ethereum/network/rlpx/MessageCodec.scala @@ -4,8 +4,10 @@ import java.util.concurrent.atomic.AtomicInteger import akka.util.ByteString import io.iohk.ethereum.network.handshaker.EtcHelloExchangeState +import io.iohk.ethereum.network.p2p.messages.WireProtocol.Hello import io.iohk.ethereum.network.p2p.{Message, MessageDecoder, MessageSerializable} import org.xerial.snappy.Snappy + import scala.util.{Failure, Success, Try} class MessageCodec(frameCodec: FrameCodec, messageDecoder: MessageDecoder, protocolVersion: Message.Version) { @@ -17,19 +19,37 @@ class MessageCodec(frameCodec: FrameCodec, messageDecoder: MessageDecoder, proto // 16Mb in base 2 val maxDecompressedLength = 16777216 - def readMessages(data: ByteString, p2pVersion: Option[Long]): Seq[Try[Message]] = { + // MessageCodec is only used from actor context so it can be var + @volatile + private var remotePeerP2pVersion: Option[Long] = None + + private def setRemoteVersionBasedOnHelloMessage(m: Message): Unit = { + if (remotePeerP2pVersion.isEmpty) { + m match { + case hello: Hello => + remotePeerP2pVersion = Some(hello.p2pVersion) + case _ => + } + } + } + + def readMessages(data: ByteString): Seq[Try[Message]] = { val frames = frameCodec.readFrames(data) frames map { frame => val frameData = frame.payload.toArray val payloadTry = - if (p2pVersion.contains(EtcHelloExchangeState.P2pVersion)){ + if (remotePeerP2pVersion.exists(version => version >= EtcHelloExchangeState.P2pVersion)) { decompressData(frameData) } else { Success(frameData) } - payloadTry.map(payload => messageDecoder.fromBytes(frame.`type`, payload, protocolVersion)) + payloadTry.map { payload => + val m = messageDecoder.fromBytes(frame.`type`, payload, protocolVersion) + setRemoteVersionBasedOnHelloMessage(m) + m + } } } @@ -42,14 +62,16 @@ class MessageCodec(frameCodec: FrameCodec, messageDecoder: MessageDecoder, proto } } - def encodeMessage(serializable: MessageSerializable, p2pVersion: Option[Long]): ByteString = { + def encodeMessage(serializable: MessageSerializable): ByteString = { val encoded: Array[Byte] = serializable.toBytes val numFrames = Math.ceil(encoded.length / MaxFramePayloadSize.toDouble).toInt val contextId = contextIdCounter.incrementAndGet() val frames = (0 until numFrames) map { frameNo => val framedPayload = encoded.drop(frameNo * MaxFramePayloadSize).take(MaxFramePayloadSize) val payload = - if (p2pVersion.contains(EtcHelloExchangeState.P2pVersion)){ + if ( + remotePeerP2pVersion.exists(version => version >= EtcHelloExchangeState.P2pVersion) && serializable.code != Hello.code + ) { Snappy.compress(framedPayload) } else { framedPayload diff --git a/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala b/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala index ea6357d7c1..d6b502e769 100644 --- a/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala +++ b/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala @@ -6,7 +6,6 @@ import akka.actor._ import akka.io.Tcp._ import akka.io.{IO, Tcp} import akka.util.ByteString -import io.iohk.ethereum.network.PeerActor.PeerP2pVersion import io.iohk.ethereum.network.p2p.{Message, MessageDecoder, MessageSerializable} import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration import io.iohk.ethereum.utils.ByteUtils @@ -24,15 +23,15 @@ import scala.util.{Failure, Success, Try} * 1. when created it waits for initial command (either handle incoming connection or connect usin g uri) * 2. when new connection is requested the actor waits for the result (waitingForConnectionResult) * 3. once underlying connection is established it either waits for handshake init message or for response message - * (depending on who initiated the connection) + * (depending on who initiated the connection) * 4. once handshake is done (and secure connection established) actor can send/receive messages (`handshaked` state) */ class RLPxConnectionHandler( - messageDecoder: MessageDecoder, - protocolVersion: Message.Version, - authHandshaker: AuthHandshaker, - messageCodecFactory: (Secrets, MessageDecoder, Message.Version) => MessageCodec, - rlpxConfiguration: RLPxConfiguration) + messageDecoder: MessageDecoder, + protocolVersion: Message.Version, + authHandshaker: AuthHandshaker, + messageCodecFactory: (Secrets, MessageDecoder, Message.Version) => MessageCodec, + rlpxConfiguration: RLPxConfiguration) extends Actor with ActorLogging { import AuthHandshaker.{InitiatePacketLength, ResponsePacketLength} @@ -126,10 +125,10 @@ class RLPxConnectionHandler( /** * Decode V4 packet * - * @param data, includes both the V4 packet with bytes from next messages + * @param data , includes both the V4 packet with bytes from next messages * @return data of the packet and the remaining data */ - private def decodeV4Packet(data: ByteString): (ByteString, ByteString) = { + private def decodeV4Packet(data: ByteString): (ByteString, ByteString) = { val encryptedPayloadSize = ByteUtils.bigEndianToShort(data.take(2).toArray) val (packetData, remainingData) = data.splitAt(encryptedPayloadSize + 2) packetData -> remainingData @@ -148,7 +147,7 @@ class RLPxConnectionHandler( log.debug(s"Auth handshake succeeded for peer $peerId") context.parent ! ConnectionEstablished(remotePubKey) val messageCodec = messageCodecFactory(secrets, messageDecoder, protocolVersion) - val messagesSoFar = messageCodec.readMessages(remainingData ,None) + val messagesSoFar = messageCodec.readMessages(remainingData) messagesSoFar foreach processMessage context become handshaked(messageCodec) @@ -170,25 +169,24 @@ class RLPxConnectionHandler( * Handles sending and receiving messages from the Akka TCP connection, while also handling acknowledgement of * messages sent. Messages are only sent when all Ack from previous messages were received. * - * @param messageCodec, for encoding the messages sent - * @param messagesNotSent, messages not yet sent - * @param cancellableAckTimeout, timeout for the message sent for which we are awaiting an acknowledgement (if there is one) - * @param seqNumber, sequence number for the next message to be sent + * @param messageCodec , for encoding the messages sent + * @param messagesNotSent , messages not yet sent + * @param cancellableAckTimeout , timeout for the message sent for which we are awaiting an acknowledgement (if there is one) + * @param seqNumber , sequence number for the next message to be sent */ def handshaked(messageCodec: MessageCodec, messagesNotSent: Queue[MessageSerializable] = Queue.empty, cancellableAckTimeout: Option[CancellableAckTimeout] = None, - seqNumber: Int = 0, - p2pVersion: Option[Long] = None): Receive = + seqNumber: Int = 0): Receive = handleWriteFailed orElse handleConnectionClosed orElse { case sm: SendMessage => - if(cancellableAckTimeout.isEmpty) - sendMessage(messageCodec, sm.serializable, seqNumber, messagesNotSent, p2pVersion) + if (cancellableAckTimeout.isEmpty) + sendMessage(messageCodec, sm.serializable, seqNumber, messagesNotSent) else - context become handshaked(messageCodec, messagesNotSent :+ sm.serializable, cancellableAckTimeout, seqNumber, p2pVersion) + context become handshaked(messageCodec, messagesNotSent :+ sm.serializable, cancellableAckTimeout, seqNumber) case Received(data) => - val messages = messageCodec.readMessages(data, p2pVersion) + val messages = messageCodec.readMessages(data) messages foreach processMessage case Ack if cancellableAckTimeout.nonEmpty => @@ -196,33 +194,29 @@ class RLPxConnectionHandler( cancellableAckTimeout.foreach(_.cancellable.cancel()) //Send next message if there is one - if(messagesNotSent.nonEmpty) - sendMessage(messageCodec, messagesNotSent.head, seqNumber, messagesNotSent.tail, p2pVersion) + if (messagesNotSent.nonEmpty) + sendMessage(messageCodec, messagesNotSent.head, seqNumber, messagesNotSent.tail) else - context become handshaked(messageCodec, Queue.empty, None, seqNumber, p2pVersion) + context become handshaked(messageCodec, Queue.empty, None, seqNumber) case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) => cancellableAckTimeout.foreach(_.cancellable.cancel()) log.debug(s"[Stopping Connection] Write to $peerId failed") context stop self - - case PeerP2pVersion(p2pVer) => - // We have peer p2p version based on hello message, if version is >= 5 next messages will be compressed. - context.become(handshaked(messageCodec, messagesNotSent, cancellableAckTimeout, seqNumber, Some(p2pVer))) } /** * Sends an encoded message through the TCP connection, an Ack will be received when the message was * successfully queued for delivery. A cancellable timeout is created for the Ack message. * - * @param messageCodec, for encoding the messages sent - * @param messageToSend, message to be sent - * @param seqNumber, sequence number for the message to be sent - * @param remainingMsgsToSend, messages not yet sent + * @param messageCodec , for encoding the messages sent + * @param messageToSend , message to be sent + * @param seqNumber , sequence number for the message to be sent + * @param remainingMsgsToSend , messages not yet sent */ private def sendMessage(messageCodec: MessageCodec, messageToSend: MessageSerializable, - seqNumber: Int, remainingMsgsToSend: Queue[MessageSerializable], p2pVersion: Option[Long]): Unit = { - val out = messageCodec.encodeMessage(messageToSend, p2pVersion) + seqNumber: Int, remainingMsgsToSend: Queue[MessageSerializable]): Unit = { + val out = messageCodec.encodeMessage(messageToSend) connection ! Write(out, Ack) log.debug(s"Sent message: $messageToSend from $peerId") @@ -231,15 +225,14 @@ class RLPxConnectionHandler( messageCodec = messageCodec, messagesNotSent = remainingMsgsToSend, cancellableAckTimeout = Some(CancellableAckTimeout(seqNumber, timeout)), - seqNumber = increaseSeqNumber(seqNumber), - p2pVersion = p2pVersion + seqNumber = increaseSeqNumber(seqNumber) ) } /** * Given a sequence number for the AckTimeouts, the next seq number is returned * - * @param seqNumber, the current sequence number + * @param seqNumber , the current sequence number * @return the sequence number for the next message sent */ private def increaseSeqNumber(seqNumber: Int): Int = seqNumber match { @@ -258,13 +251,14 @@ class RLPxConnectionHandler( if (msg.isPeerClosed) { log.debug(s"[Stopping Connection] Connection with $peerId closed by peer") } - if(msg.isErrorClosed){ + if (msg.isErrorClosed) { log.debug(s"[Stopping Connection] Connection with $peerId closed because of error ${msg.getErrorCause}") } context stop self } } + } object RLPxConnectionHandler { diff --git a/src/test/scala/io/iohk/ethereum/network/p2p/MessageCodecSpec.scala b/src/test/scala/io/iohk/ethereum/network/p2p/MessageCodecSpec.scala new file mode 100644 index 0000000000..a55b54da17 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/network/p2p/MessageCodecSpec.scala @@ -0,0 +1,100 @@ +package io.iohk.ethereum.network.p2p + +import akka.util.ByteString +import io.iohk.ethereum.network.handshaker.EtcHelloExchangeState +import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status +import io.iohk.ethereum.network.p2p.messages.Versions +import io.iohk.ethereum.network.p2p.messages.WireProtocol.{Capability, Hello} +import io.iohk.ethereum.network.rlpx.{FrameCodec, MessageCodec} +import io.iohk.ethereum.utils.Config +import org.scalatest.{FlatSpec, Matchers} + +class MessageCodecSpec extends FlatSpec with Matchers { + + it should "not compress messages when remote side advertises p2p version less than 5" in new TestSetup { + val remoteHello = remoteMessageCodec.encodeMessage(helloV4) + val localReceviveRemoteHello = messageCodec.readMessages(remoteHello) + + val localNextMessageAfterHello = messageCodec.encodeMessage(status) + val remoteReadNotCompressedStatus = remoteMessageCodec.readMessages(localNextMessageAfterHello) + + // remote peer did not receive local status so it treats all remote messages as uncompressed + assert(remoteReadNotCompressedStatus.size == 1) + assert(remoteReadNotCompressedStatus.head.get == status) + } + + it should "compress messages when remote side advertises p2p version larger or equal 5" in new TestSetup { + val remoteHello = remoteMessageCodec.encodeMessage(helloV5) + val localReceviveRemoteHello = messageCodec.readMessages(remoteHello) + + val localNextMessageAfterHello = messageCodec.encodeMessage(status) + val remoteReadNotCompressedStatus = remoteMessageCodec.readMessages(localNextMessageAfterHello) + + // remote peer did not receive local status so it treats all remote messages as uncompressed, + // but local peer compress messages after V5 Hello message + assert(remoteReadNotCompressedStatus.size == 1) + assert(remoteReadNotCompressedStatus.head.isFailure) + } + + it should "compress messages when both sides advertises p2p version larger or equal 5" in new TestSetup { + val remoteHello = remoteMessageCodec.encodeMessage(helloV5) + val localReceviveRemoteHello = messageCodec.readMessages(remoteHello) + + val localHello = messageCodec.encodeMessage(helloV5) + val remoteReceivedLocalHello = remoteMessageCodec.readMessages(localHello) + + val localNextMessageAfterHello = messageCodec.encodeMessage(status) + val remoteReadNextMessageAfterHello = remoteMessageCodec.readMessages(localNextMessageAfterHello) + + // both peers exchanged v5 hellos, so they should send compressed messages + assert(remoteReadNextMessageAfterHello.size == 1) + assert(remoteReadNextMessageAfterHello.head.get == status) + } + + it should "compress and decompress first message after hello when receiving 2 frames" in new TestSetup { + val remoteHello = remoteMessageCodec.encodeMessage(helloV5) + val localReceviveRemoteHello = messageCodec.readMessages(remoteHello) + + // hello won't be compressed as per spec it never is, and status will be compressed as remote peer advertised proper versions + val localHello = messageCodec.encodeMessage(helloV5) + val localStatus = messageCodec.encodeMessage(status) + + // both messages will be read at one, but after reading hello decompressing will be activated + val remoteReadBothMessages = remoteMessageCodec.readMessages(localHello ++ localStatus) + + // both peers exchanged v5 hellos, so they should send compressed messages + assert(remoteReadBothMessages.size == 2) + assert(remoteReadBothMessages.head.get == helloV5) + assert(remoteReadBothMessages.last.get == status) + } + + trait TestSetup extends SecureChannelSetup { + val frameCodec = new FrameCodec(secrets) + val remoteFrameCodec = new FrameCodec(remoteSecrets) + + val helloV5 = Hello( + p2pVersion = EtcHelloExchangeState.P2pVersion, + clientId = Config.clientId, + capabilities = Seq(Capability("eth", Versions.PV63.toByte)), + listenPort = 0, //Local node not listening + nodeId = ByteString(1) + ) + + val helloV4 = helloV5.copy(p2pVersion = 4) + + val status = Status( + protocolVersion = Versions.PV63, + networkId = Config.Network.peer.networkId, + totalDifficulty = 1, + bestHash = ByteString(1), + genesisHash = ByteString(1) + ) + + + val messageCodec = new MessageCodec(frameCodec, EthereumMessageDecoder, Versions.PV63) + val remoteMessageCodec = new MessageCodec(remoteFrameCodec, EthereumMessageDecoder, Versions.PV63) + + } + + +} diff --git a/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala b/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala index a8db3e7d33..ca24b7f333 100644 --- a/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala @@ -13,7 +13,7 @@ import io.iohk.ethereum.crypto.generateKeyPair import io.iohk.ethereum.db.storage.AppStateStorage import io.iohk.ethereum.domain._ import io.iohk.ethereum.network.PeerActor.Status.Handshaked -import io.iohk.ethereum.network.PeerActor.{GetStatus, PeerP2pVersion, StatusResponse} +import io.iohk.ethereum.network.PeerActor.{GetStatus, StatusResponse} import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration} import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration} import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status @@ -81,9 +81,9 @@ class PeerActorSpec extends FlatSpec with Matchers { val knownNodesManager = TestProbe() val peer = TestActorRef(Props(new PeerActor(new InetSocketAddress("127.0.0.1", 0), _ => { - rlpxConnection = TestProbe() - rlpxConnection.ref - }, peerConf, peerMessageBus, knownNodesManager.ref, false, Some(time.scheduler), + rlpxConnection = TestProbe() + rlpxConnection.ref + }, peerConf, peerMessageBus, knownNodesManager.ref, false, Some(time.scheduler), handshaker))) peer ! PeerActor.ConnectTo(new URI("encode://localhost:9000")) @@ -115,7 +115,6 @@ class PeerActorSpec extends FlatSpec with Matchers { rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, networkId = peerConf.networkId, @@ -151,7 +150,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, @@ -185,7 +183,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val header = Fixtures.Blocks.ValidBlock.header.copy(difficulty = daoForkBlockTotalDifficulty + 100000, number = 3000000) storagesInstance.storages.appStateStorage.putBestBlockNumber(3000000) // after the fork @@ -216,7 +213,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, @@ -260,7 +256,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, @@ -289,7 +284,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, @@ -333,7 +327,7 @@ class PeerActorSpec extends FlatSpec with Matchers { rlpxConnection.reply(RLPxConnectionHandler.ConnectionEstablished(remoteNodeId)) rlpxConnection.send(peerActor, RLPxConnectionHandler.MessageReceived(Ping())) - rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: PongEnc) => ()} + rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: PongEnc) => () } } it should "disconnect gracefully after handshake" in new TestSetup { @@ -345,7 +339,6 @@ class PeerActorSpec extends FlatSpec with Matchers { val remoteHello = Hello(4, "test-client", Seq(Capability("eth", Versions.PV63.toByte)), 9000, ByteString("unused")) rlpxConnection.expectMsgPF() { case RLPxConnectionHandler.SendMessage(_: HelloEnc) => () } rlpxConnection.send(peer, RLPxConnectionHandler.MessageReceived(remoteHello)) - rlpxConnection.expectMsgPF() { case PeerP2pVersion(version) if version == remoteHello.p2pVersion => ()} val remoteStatus = Status( protocolVersion = Versions.PV63, diff --git a/src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala b/src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala index cfb5edac1e..2576576f01 100644 --- a/src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandlerSpec.scala @@ -24,7 +24,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory setupIncomingRLPxConnection() - (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable, None).returning(ByteString("ping encoded")) + (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")) rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping()) connection.expectMsg(Tcp.Write(ByteString("ping encoded"), RLPxConnectionHandler.Ack)) @@ -32,7 +32,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory it should "write messages to TCP connection once all previous ACK were received" in new TestSetup { - (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable, None).returning(ByteString("ping encoded")).anyNumberOfTimes() + (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes() setupIncomingRLPxConnection() @@ -51,7 +51,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory it should "accummulate messages and write them when receiving ACKs" in new TestSetup { - (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable, None).returning(ByteString("ping encoded")).anyNumberOfTimes() + (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes() setupIncomingRLPxConnection() @@ -76,7 +76,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory } it should "close the connection when Ack timeout happens" in new TestSetup { - (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable, None).returning(ByteString("ping encoded")).anyNumberOfTimes() + (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes() setupIncomingRLPxConnection() @@ -88,7 +88,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory } it should "ignore timeout of old messages" in new TestSetup { - (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable, None).returning(ByteString("ping encoded")).anyNumberOfTimes() + (mockMessageCodec.encodeMessage _).expects(Ping(): MessageSerializable).returning(ByteString("ping encoded")).anyNumberOfTimes() setupIncomingRLPxConnection() @@ -117,8 +117,8 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory connection.expectMsgClass(classOf[Tcp.Register]) //AuthHandshaker throws exception on initial message - (mockHandshaker.handleInitialMessage _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")} - (mockHandshaker.handleInitialMessageV4 _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")} + (mockHandshaker.handleInitialMessage _).expects(*).onCall { _: ByteString => throw new Exception("MAC invalid") } + (mockHandshaker.handleInitialMessageV4 _).expects(*).onCall { _: ByteString => throw new Exception("MAC invalid") } val data = ByteString((0 until AuthHandshaker.InitiatePacketLength).map(_.toByte).toArray) rlpxConnection ! Tcp.Received(data) @@ -140,8 +140,8 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory tcpActorProbe.expectMsg(Tcp.Write(initPacket)) //AuthHandshaker handles the response message (that throws an invalid MAC) - (mockHandshaker.handleResponseMessage _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")} - (mockHandshaker.handleResponseMessageV4 _).expects(*).onCall{_: ByteString => throw new Exception("MAC invalid")} + (mockHandshaker.handleResponseMessage _).expects(*).onCall { _: ByteString => throw new Exception("MAC invalid") } + (mockHandshaker.handleResponseMessageV4 _).expects(*).onCall { _: ByteString => throw new Exception("MAC invalid") } val data = ByteString((0 until AuthHandshaker.ResponsePacketLength).map(_.toByte).toArray) rlpxConnection ! Tcp.Received(data) @@ -191,7 +191,7 @@ class RLPxConnectionHandlerSpec extends FlatSpec with Matchers with MockFactory val data = ByteString((0 until AuthHandshaker.InitiatePacketLength).map(_.toByte).toArray) val response = ByteString("response data") (mockHandshaker.handleInitialMessage _).expects(data).returning((response, AuthHandshakeSuccess(mock[Secrets], ByteString()))) - (mockMessageCodec.readMessages _).expects(ByteString.empty, None).returning(Nil) //For processing of messages after handshaking finishes + (mockMessageCodec.readMessages _).expects(ByteString.empty).returning(Nil) //For processing of messages after handshaking finishes rlpxConnection ! Tcp.Received(data) connection.expectMsg(Tcp.Write(response)) From 76dedc0faaad128c510f4944f5ecc9b051f1fa86 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 21 Sep 2020 14:05:29 +0200 Subject: [PATCH 8/9] [ETCM-102] Break connection in case of decoding failure --- .../io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala b/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala index d6b502e769..ff87333c4d 100644 --- a/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala +++ b/src/main/scala/io/iohk/ethereum/network/rlpx/RLPxConnectionHandler.scala @@ -163,6 +163,8 @@ class RLPxConnectionHandler( case Failure(ex) => log.debug(s"Cannot decode message from $peerId, because of ${ex.getMessage}") + // break connection in case of failed decoding, to avoid attack which would send us garbage + context stop self } /** From 5665378bff7987ee263a1b6d8051f7a0e5a27d1a Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 21 Sep 2020 16:01:45 +0200 Subject: [PATCH 9/9] [ETCM-102] Fix conflicts after merge with develop --- src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index b5b35a1d41..a2486f5803 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -13,7 +13,7 @@ import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig} -import io.iohk.ethereum.db.components.{SharedRocksDbDataSources, Storages} +import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages} import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource} import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode} import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces} @@ -187,7 +187,7 @@ object FastSyncItSpec { } lazy val nodeStatusHolder = new AtomicReference(nodeStatus) - lazy val storagesInstance = new SharedRocksDbDataSources with LocalPruningConfigBuilder with Storages.DefaultStorages { + lazy val storagesInstance = new RocksDbDataSourceComponent with LocalPruningConfigBuilder with Storages.DefaultStorages { override lazy val dataSource: RocksDbDataSource = RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq) } lazy val blockchainConfig = Config.blockchains.blockchainConfig