Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object Dependencies {

val testing: Seq[ModuleID] = Seq(
"org.scalatest" %% "scalatest" % "3.2.2" % "it,test",
"org.scalamock" %% "scalamock" % "5.0.0" % "test",
"org.scalamock" %% "scalamock" % "5.0.0" % "it,test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.3.0" % "test",
"org.scalacheck" %% "scalacheck" % "1.15.1" % "it,test",
"com.softwaremill.diffx" %% "diffx-core" % "0.3.30" % "test",
Expand Down
196 changes: 196 additions & 0 deletions src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package io.iohk.ethereum.ledger

import akka.testkit.TestProbe
import akka.util.ByteString
import cats.data.NonEmptyList
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.NewCheckpoint
import io.iohk.ethereum.blockchain.sync.regular.{BlockFetcher, BlockImporter}
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import io.iohk.ethereum.domain._
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.{Fixtures, ObjectGenerators, crypto}
import io.iohk.ethereum.ledger.Ledger.BlockResult
import monix.execution.Scheduler
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AsyncFlatSpecLike
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

class BlockImporterItSpec extends MockFactory with TestSetupWithVmAndValidators with AsyncFlatSpecLike with Matchers with BeforeAndAfterAll {

implicit val testScheduler = Scheduler.fixedPool("test", 32)

override def afterAll(): Unit = {
testScheduler.shutdown()
testScheduler.awaitTermination(60.second)
}

val blockQueue = BlockQueue(blockchain, SyncConfig(Config.config))

val genesis = Block(
Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash)),
Fixtures.Blocks.Genesis.body
)
val genesisWeight = ChainWeight.zero.increase(genesis.header)

blockchain.save(genesis, Seq(), genesisWeight, saveAsBestBlock = true)

lazy val checkpointBlockGenerator: CheckpointBlockGenerator = new CheckpointBlockGenerator

val fetcherProbe = TestProbe()
val ommersPoolProbe = TestProbe()
val broadcasterProbe = TestProbe()
val pendingTransactionsManagerProbe = TestProbe()
val supervisor = TestProbe()

val emptyWorld: InMemoryWorldStateProxy =
blockchain.getWorldStateProxy(
-1,
UInt256.Zero,
ByteString(MerklePatriciaTrie.EmptyRootHash),
noEmptyAccounts = false,
ethCompatibleStorage = true
)

override lazy val ledger = new TestLedgerImpl(successValidators) {
override private[ledger] lazy val blockExecution = new BlockExecution(blockchain, blockchainConfig, consensus.blockPreparator, blockValidation) {
override def executeAndValidateBlock(block: Block, alreadyValidated: Boolean = false): Either[BlockExecutionError, Seq[Receipt]] =
Right(BlockResult(emptyWorld).receipts)
}
}

val blockImporter = system.actorOf(
BlockImporter.props(
fetcherProbe.ref,
ledger,
blockchain,
syncConfig,
ommersPoolProbe.ref,
broadcasterProbe.ref,
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))

val genesisBlock = blockchain.genesisBlock
val block1: Block = getBlock(genesisBlock.number + 1, parent = genesisBlock.header.hash)
// new chain is shorter but has a higher weight
val newBlock2: Block = getBlock(genesisBlock.number + 2, difficulty = 108, parent = block1.header.hash)
val newBlock3: Block = getBlock(genesisBlock.number + 3, difficulty = 300, parent = newBlock2.header.hash)
val oldBlock2: Block = getBlock(genesisBlock.number + 2, difficulty = 102, parent = block1.header.hash)
val oldBlock3: Block = getBlock(genesisBlock.number + 3, difficulty = 103, parent = oldBlock2.header.hash)
val oldBlock4: Block = getBlock(genesisBlock.number + 4, difficulty = 104, parent = oldBlock3.header.hash)

val weight1 = ChainWeight.totalDifficultyOnly(block1.header.difficulty)
val newWeight2 = weight1.increase(newBlock2.header)
val newWeight3 = newWeight2.increase(newBlock3.header)
val oldWeight2 = weight1.increase(oldBlock2.header)
val oldWeight3 = oldWeight2.increase(oldBlock3.header)
val oldWeight4 = oldWeight3.increase(oldBlock4.header)

//saving initial main chain
blockchain.save(block1, Nil, weight1, saveAsBestBlock = true)
blockchain.save(oldBlock2, Nil, oldWeight2, saveAsBestBlock = true)
blockchain.save(oldBlock3, Nil, oldWeight3, saveAsBestBlock = true)
blockchain.save(oldBlock4, Nil, oldWeight4, saveAsBestBlock = true)

val oldBranch = List(oldBlock2, oldBlock3, oldBlock4)
val newBranch = List(newBlock2, newBlock3)

blockImporter ! BlockImporter.Start

"BlockImporter" should "not discard blocks of the main chain if the reorganisation failed" in {

//ledger with not mocked blockExecution
val ledger = new TestLedgerImpl(successValidators)
val blockImporter = system.actorOf(
BlockImporter.props(
fetcherProbe.ref,
ledger,
blockchain,
syncConfig,
ommersPoolProbe.ref,
broadcasterProbe.ref,
pendingTransactionsManagerProbe.ref,
checkpointBlockGenerator,
supervisor.ref
))

blockImporter ! BlockImporter.Start
blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))

Thread.sleep(1000)
//because the blocks are not valid, we shouldn't reorganise, but at least stay with a current chain, and the best block of the current chain is oldBlock4
blockchain.getBestBlock().get shouldEqual oldBlock4
}

it should "return a correct new best block after reorganising longer chain to a shorter one if its weight is bigger" in {

//returning discarded initial chain
blockchain.save(oldBlock2, Nil, oldWeight2, saveAsBestBlock = true)
blockchain.save(oldBlock3, Nil, oldWeight3, saveAsBestBlock = true)
blockchain.save(oldBlock4, Nil, oldWeight4, saveAsBestBlock = true)

blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))

Thread.sleep(200)
blockchain.getBestBlock().get shouldEqual newBlock3
}


it should "switch to a branch with a checkpoint" in {

val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get
val oldBlock5WithCheckpoint: Block = checkpointBlockGenerator.generate(oldBlock4, checkpoint)
blockchain.save(oldBlock5WithCheckpoint, Nil, oldWeight4, saveAsBestBlock = true)

val newBranch = List(newBlock2, newBlock3)

blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))

Thread.sleep(200)
blockchain.getBestBlock().get shouldEqual oldBlock5WithCheckpoint
blockchain.getLatestCheckpointBlockNumber() shouldEqual oldBlock5WithCheckpoint.header.number
}

it should "switch to a branch with a newer checkpoint" in {

val checkpoint = ObjectGenerators.fakeCheckpointGen(3, 3).sample.get
val newBlock4WithCheckpoint: Block = checkpointBlockGenerator.generate(newBlock3, checkpoint)
blockchain.save(newBlock4WithCheckpoint, Nil, newWeight3, saveAsBestBlock = true)

val newBranch = List(newBlock4WithCheckpoint)

blockImporter ! BlockFetcher.PickedBlocks(NonEmptyList.fromListUnsafe(newBranch))

Thread.sleep(200)
blockchain.getBestBlock().get shouldEqual newBlock4WithCheckpoint
blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock4WithCheckpoint.header.number
}

it should "return a correct checkpointed block after receiving a request for generating a new checkpoint" in {

val parent = blockchain.getBestBlock().get
val newBlock5: Block = getBlock(genesisBlock.number + 5, difficulty = 104, parent = parent.header.hash)
val newWeight5 = newWeight3.increase(newBlock5.header)

blockchain.save(newBlock5, Nil, newWeight5, saveAsBestBlock = true)

val signatures = CheckpointingTestHelpers.createCheckpointSignatures(
Seq(crypto.generateKeyPair(secureRandom)),
newBlock5.hash
)
blockImporter ! NewCheckpoint(newBlock5.hash, signatures)

val checkpointBlock = checkpointBlockGenerator.generate(newBlock5, Checkpoint(signatures))

Thread.sleep(1000)
blockchain.getBestBlock().get shouldEqual checkpointBlock
blockchain.getLatestCheckpointBlockNumber() shouldEqual newBlock5.header.number + 1
}
}
21 changes: 9 additions & 12 deletions src/main/scala/io/iohk/ethereum/domain/Blockchain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ class BlockchainImpl(
}

def save(block: Block, receipts: Seq[Receipt], weight: ChainWeight, saveAsBestBlock: Boolean): Unit = {
log.debug("Saving new block block {} to database", block.idTag)
storeBlock(block)
.and(storeReceipts(block.header.hash, receipts))
.and(storeChainWeight(block.header.hash, weight))
.commit()

if (saveAsBestBlock && block.hasCheckpoint) {
log.debug(
"New best known block block number - {}, new best checkpoint number - {}",
Expand All @@ -362,6 +356,12 @@ class BlockchainImpl(
saveBestKnownBlock(block.header.number)
}

log.debug("Saving new block block {} to database", block.idTag)
storeBlock(block)
.and(storeReceipts(block.header.hash, receipts))
.and(storeChainWeight(block.header.hash, weight))
.commit()

// not transactional part
// the best blocks data will be persisted only when the cache will be persisted
stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
Expand Down Expand Up @@ -397,20 +397,17 @@ class BlockchainImpl(
}
}

private def saveBestKnownBlock(bestBlockNumber: BigInt): Unit = {
private def saveBestKnownBlock(bestBlockNumber: BigInt): Unit =
bestKnownBlockAndLatestCheckpoint.updateAndGet(_.copy(bestBlockNumber = bestBlockNumber))
}

private def saveBestKnownBlockAndLatestCheckpointNumber(number: BigInt, latestCheckpointNumber: BigInt): Unit = {
private def saveBestKnownBlockAndLatestCheckpointNumber(number: BigInt, latestCheckpointNumber: BigInt): Unit =
bestKnownBlockAndLatestCheckpoint.set(BestBlockLatestCheckpointNumbers(number, latestCheckpointNumber))
}

def storeChainWeight(blockhash: ByteString, weight: ChainWeight): DataSourceBatchUpdate =
chainWeightStorage.put(blockhash, weight)

def saveNode(nodeHash: NodeHash, nodeEncoded: NodeEncoded, blockNumber: BigInt): Unit = {
def saveNode(nodeHash: NodeHash, nodeEncoded: NodeEncoded, blockNumber: BigInt): Unit =
stateStorage.saveNode(nodeHash, nodeEncoded, blockNumber)
}

override protected def getHashByBlockNumber(number: BigInt): Option[ByteString] =
blockNumberMappingStorage.get(number)
Expand Down
21 changes: 14 additions & 7 deletions src/main/scala/io/iohk/ethereum/ledger/BlockExecution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import io.iohk.ethereum.ledger.BlockExecutionError.MissingParentError
import io.iohk.ethereum.ledger.Ledger.BlockResult
import io.iohk.ethereum.utils.{BlockchainConfig, DaoForkConfig, Logger}
import io.iohk.ethereum.vm.EvmConfig

import scala.annotation.tailrec
import cats.implicits._
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MPTException

class BlockExecution(
blockchain: BlockchainImpl,
Expand Down Expand Up @@ -59,7 +62,9 @@ class BlockExecution(
.getBlockHeaderByHash(block.header.parentHash)
.toRight(MissingParentError) // Should not never occur because validated earlier
execResult <- executeBlockTransactions(block, parent)
worldToPersist = blockPreparator.payBlockReward(block, execResult.worldState)
worldToPersist <- Either
.catchOnly[MPTException](blockPreparator.payBlockReward(block, execResult.worldState))
.leftMap(BlockExecutionError.MPTError.apply)
// State root hash needs to be up-to-date for validateBlockAfterExecution
worldPersisted = InMemoryWorldStateProxy.persistState(worldToPersist)
} yield execResult.copy(worldState = worldPersisted)
Expand Down Expand Up @@ -169,19 +174,21 @@ sealed trait BlockExecutionError {

sealed trait BlockExecutionSuccess

case object BlockExecutionSuccess extends BlockExecutionSuccess
final case object BlockExecutionSuccess extends BlockExecutionSuccess

object BlockExecutionError {
case class ValidationBeforeExecError(reason: Any) extends BlockExecutionError
final case class ValidationBeforeExecError(reason: Any) extends BlockExecutionError

case class StateBeforeFailure(worldState: InMemoryWorldStateProxy, acumGas: BigInt, acumReceipts: Seq[Receipt])
final case class StateBeforeFailure(worldState: InMemoryWorldStateProxy, acumGas: BigInt, acumReceipts: Seq[Receipt])

case class TxsExecutionError(stx: SignedTransaction, stateBeforeError: StateBeforeFailure, reason: String)
final case class TxsExecutionError(stx: SignedTransaction, stateBeforeError: StateBeforeFailure, reason: String)
extends BlockExecutionError

case class ValidationAfterExecError(reason: String) extends BlockExecutionError
final case class ValidationAfterExecError(reason: String) extends BlockExecutionError

case object MissingParentError extends BlockExecutionError {
final case object MissingParentError extends BlockExecutionError {
override val reason: Any = "Cannot find parent"
}

final case class MPTError(reason: MPTException) extends BlockExecutionError
}
4 changes: 2 additions & 2 deletions src/main/scala/io/iohk/ethereum/ledger/BlockValidation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class BlockValidation(consensus: Consensus, blockchain: Blockchain, blockQueue:

def validateBlockAfterExecution(
block: Block,
hash: ByteString,
stateRootHash: ByteString,
receipts: Seq[Receipt],
gasUsed: BigInt
): Either[BlockExecutionError, BlockExecutionSuccess] = {
consensus.validators.validateBlockAfterExecution(
block = block,
stateRootHash = hash,
stateRootHash = stateRootHash,
receipts = receipts,
gasUsed = gasUsed
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.concurrent.ExecutionContext
* [[io.iohk.ethereum.nodebuilder.Node Node]].
*/
trait ScenarioSetup extends StdTestConsensusBuilder with SyncConfigBuilder with StdLedgerBuilder {
protected lazy val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
protected lazy val monixScheduler = Scheduler(executionContext)
protected lazy val executionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
protected lazy val monixScheduler = Scheduler(executionContextExecutor)
protected lazy val successValidators: Validators = Mocks.MockValidatorsAlwaysSucceed
protected lazy val failureValidators: Validators = Mocks.MockValidatorsAlwaysFail
protected lazy val ethashValidators: ValidatorsExecutor = ValidatorsExecutor(blockchainConfig, Protocol.Ethash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class BlockFetcherSpec
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
}

"should not enqueue requested blocks if the received bodies does not match" in new TestSetup {
"should not enqueue requested blocks if the received bodies do not match" in new TestSetup {

// Important: Here we are forcing the mismatch between request headers and received bodies
override lazy val validators = new MockValidatorsFailingOnBlockBodies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package io.iohk.ethereum.db.dataSource

import java.io.File
import java.nio.file.Files

import io.iohk.ethereum.ObjectGenerators
import io.iohk.ethereum.db.dataSource.DataSource.{Key, Namespace, Value}
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.RocksDbDataSourceClosedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

Expand Down Expand Up @@ -48,6 +48,15 @@ trait DataSourceTestBehavior extends ScalaCheckPropertyChecks with ObjectGenerat
}
}

it should "throw an exception if the rocksdb storage is unavailable" in {
withDir { path =>
val dataSource = createDataSource(path)
val someByteString = byteStringOfLengthNGen(KeySizeWithoutPrefix).sample.get
dataSource.destroy()
assertThrows[RocksDbDataSourceClosedException](dataSource.update(prepareUpdate(toUpsert = Seq(someByteString -> someByteString))))
}
}

it should "allow to remove keys" in {
val key1 = byteStringOfLengthNGen(KeySizeWithoutPrefix).sample.get
val key2 = byteStringOfLengthNGen(KeySizeWithoutPrefix).sample.get
Expand Down Expand Up @@ -111,5 +120,4 @@ trait DataSourceTestBehavior extends ScalaCheckPropertyChecks with ObjectGenerat
}
}
// scalastyle:on

}
Loading