Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ abstract class ScenarioSetup(_vm: VMImpl, scenario: BlockchainScenario) {
Block(scenario.genesisBlockHeader.toBlockHeader, BlockBody(Nil, Nil))
}

val genesisWeight = ChainWeight.zero.increase(genesisBlock.header)

blockchain
.storeBlock(genesisBlock)
.and(blockchain.storeReceipts(genesisBlock.header.hash, Nil))
.and(blockchain.storeTotalDifficulty(genesisBlock.header.hash, genesisBlock.header.difficulty))
.and(blockchain.storeReceipts(genesisBlock.hash, Nil))
.and(blockchain.storeChainWeight(genesisBlock.hash, genesisWeight))
.commit()

genesisBlock
Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
_ <- peer2.waitForRegularSyncLoadLastBlock(blockNumer + 3)
} yield {
assert(
peer1.bl.getTotalDifficultyByHash(peer1.bl.getBestBlock().hash) == peer2.bl.getTotalDifficultyByHash(
peer1.bl.getChainWeightByHash(peer1.bl.getBestBlock().hash) == peer2.bl.getChainWeightByHash(
peer2.bl.getBestBlock().hash
)
)
(peer1.bl.getBlockByNumber(blockNumer + 1), peer2.bl.getBlockByNumber(blockNumer + 1)) match {
case (Some(blockP1), Some(blockP2)) =>
assert(peer1.bl.getTotalDifficultyByHash(blockP1.hash) == peer2.bl.getTotalDifficultyByHash(blockP2.hash))
assert(peer1.bl.getChainWeightByHash(blockP1.hash) == peer2.bl.getChainWeightByHash(blockP2.hash))
case (_, _) => fail("invalid difficulty validation")
}
}
Expand Down
29 changes: 15 additions & 14 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.iohk.ethereum.db.components.{RocksDbDataSourceComponent, Storages}
import io.iohk.ethereum.db.dataSource.{RocksDbConfig, RocksDbDataSource}
import io.iohk.ethereum.db.storage.pruning.{ArchivePruning, PruningMode}
import io.iohk.ethereum.db.storage.{AppStateStorage, Namespaces}
import io.iohk.ethereum.domain.{Block, Blockchain, BlockchainImpl}
import io.iohk.ethereum.domain.{Block, Blockchain, BlockchainImpl, ChainWeight}
import io.iohk.ethereum.ledger.InMemoryWorldStateProxy
import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
Expand Down Expand Up @@ -116,8 +116,9 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
Fixtures.Blocks.Genesis.header.copy(stateRoot = ByteString(MerklePatriciaTrie.EmptyRootHash)),
Fixtures.Blocks.Genesis.body
)
val genesisWeight = ChainWeight.zero.increase(genesis.header)

bl.save(genesis, Seq(), genesis.header.difficulty, saveAsBestBlock = true)
bl.save(genesis, Seq(), genesisWeight, saveAsBestBlock = true)

lazy val nh = nodeStatusHolder

Expand Down Expand Up @@ -228,15 +229,15 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
)
}

private def broadcastBlock(block: Block, td: BigInt) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, td))
private def broadcastBlock(block: Block, weight: ChainWeight) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
}

def getCurrentState(): BlockchainState = {
val bestBlock = bl.getBestBlock()
val currentWorldState = getMptForBlock(bestBlock)
val currentTd = bl.getTotalDifficultyByHash(bestBlock.hash).get
BlockchainState(bestBlock, currentWorldState, currentTd)
val currentWeight = bl.getChainWeightByHash(bestBlock.hash).get
BlockchainState(bestBlock, currentWorldState, currentWeight)
}

def startPeer(): Task[Unit] = {
Expand Down Expand Up @@ -272,16 +273,16 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
} yield ()
}

private def createChildBlock(parent: Block, parentTd: BigInt, parentWorld: InMemoryWorldStateProxy)(
private def createChildBlock(parent: Block, parentWeight: ChainWeight, parentWorld: InMemoryWorldStateProxy)(
updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy
): (Block, BigInt, InMemoryWorldStateProxy) = {
): (Block, ChainWeight, InMemoryWorldStateProxy) = {
val newBlockNumber = parent.header.number + 1
val newWorld = updateWorldForBlock(newBlockNumber, parentWorld)
val newBlock = parent.copy(header =
parent.header.copy(parentHash = parent.header.hash, number = newBlockNumber, stateRoot = newWorld.stateRootHash)
)
val newTd = newBlock.header.difficulty + parentTd
(newBlock, newTd, parentWorld)
val newWeight = parentWeight.increase(newBlock.header)
(newBlock, newWeight, parentWorld)
}

def importBlocksUntil(
Expand All @@ -292,12 +293,12 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
Task(())
} else {
Task {
val currentTd = bl.getTotalDifficultyByHash(block.hash).get
val currentWeight = bl.getChainWeightByHash(block.hash).get
val currentWolrd = getMptForBlock(block)
val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock)
bl.save(newBlock, Seq(), newTd, saveAsBestBlock = true)
val (newBlock, newWeight, _) = createChildBlock(block, currentWeight, currentWolrd)(updateWorldForBlock)
bl.save(newBlock, Seq(), newWeight, saveAsBestBlock = true)
bl.persistCachedNodes()
broadcastBlock(newBlock, newTd)
broadcastBlock(newBlock, newWeight)
}.flatMap(_ => importBlocksUntil(n)(updateWorldForBlock))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ object RegularSyncItSpecUtils {
case None => bl.getBestBlock()
}).flatMap { block =>
Task {
val currentTd = bl
.getTotalDifficultyByHash(block.hash)
.getOrElse(throw new RuntimeException(s"block by hash: ${block.hash} doesn't exist"))
val currentWolrd = getMptForBlock(block)
val (newBlock, newTd, newWorld) = createChildBlock(block, currentTd, currentWolrd)(updateWorldForBlock)
broadcastBlock(newBlock, newTd)
val currentWeight = bl
.getChainWeightByHash(block.hash)
.getOrElse(throw new RuntimeException(s"ChainWeight by hash: ${block.hash} doesn't exist"))
val currentWorld = getMptForBlock(block)
val (newBlock, newWeight, _) = createChildBlock(block, currentWeight, currentWorld)(updateWorldForBlock)
broadcastBlock(newBlock, newWeight)
}
}
}
Expand All @@ -110,12 +110,12 @@ object RegularSyncItSpecUtils {
plusDifficulty: BigInt = 0
)(updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy): Task[Unit] = Task {
val block: Block = bl.getBestBlock()
val currentTd = bl
.getTotalDifficultyByHash(block.hash)
.getOrElse(throw new RuntimeException(s"block by hash: ${block.hash} doesn't exist"))
val currentWeight = bl
.getChainWeightByHash(block.hash)
.getOrElse(throw new RuntimeException(s"ChainWeight by hash: ${block.hash} doesn't exist"))
val currentWolrd = getMptForBlock(block)
val (newBlock, newTd, newWorld) =
createChildBlock(block, currentTd, currentWolrd, plusDifficulty)(updateWorldForBlock)
val (newBlock, _, _) =
createChildBlock(block, currentWeight, currentWolrd, plusDifficulty)(updateWorldForBlock)
regularSync ! SyncProtocol.MinedBlock(newBlock)
}

Expand All @@ -139,18 +139,18 @@ object RegularSyncItSpecUtils {
)
}

private def broadcastBlock(block: Block, td: BigInt) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, td))
private def broadcastBlock(block: Block, weight: ChainWeight) = {
broadcasterActor ! BroadcastBlock(NewBlock(block, weight))
}

private def createChildBlock(
parent: Block,
parentTd: BigInt,
parentWeight: ChainWeight,
parentWorld: InMemoryWorldStateProxy,
plusDifficulty: BigInt = 0
)(
updateWorldForBlock: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy
): (Block, BigInt, InMemoryWorldStateProxy) = {
): (Block, ChainWeight, InMemoryWorldStateProxy) = {
val newBlockNumber = parent.header.number + 1
val newWorld = updateWorldForBlock(newBlockNumber, parentWorld)
val newBlock = parent.copy(header =
Expand All @@ -161,8 +161,8 @@ object RegularSyncItSpecUtils {
difficulty = plusDifficulty + parent.header.difficulty
)
)
val newTd = newBlock.header.difficulty + parentTd
(newBlock, newTd, parentWorld)
val newWeight = parentWeight.increase(newBlock.header)
(newBlock, newWeight, parentWorld)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.ethereum.sync.util

import java.net.{InetSocketAddress, ServerSocket}

import io.iohk.ethereum.domain.Block
import io.iohk.ethereum.domain.{Block, ChainWeight}
import io.iohk.ethereum.ledger.InMemoryWorldStateProxy

object SyncCommonItSpec {
Expand All @@ -17,5 +17,9 @@ object SyncCommonItSpec {
}
}

final case class BlockchainState(bestBlock: Block, currentWorldState: InMemoryWorldStateProxy, currentTd: BigInt)
final case class BlockchainState(
bestBlock: Block,
currentWorldState: InMemoryWorldStateProxy,
currentWeight: ChainWeight
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ class BlockchainMock(genesisHash: ByteString) extends Blockchain {

override def storeEvmCode(hash: ByteString, evmCode: ByteString): DataSourceBatchUpdate = ???

override def storeTotalDifficulty(blockhash: ByteString, totalDifficulty: BigInt): DataSourceBatchUpdate = ???
override def storeChainWeight(blockhash: ByteString, chainWeight: ChainWeight): DataSourceBatchUpdate = ???

override def saveNode(nodeHash: NodeHash, nodeEncoded: NodeEncoded, blockNumber: BigInt): Unit = ???

override def removeBlock(hash: ByteString, withState: Boolean = true): Unit = ???

override def getTotalDifficultyByHash(blockhash: ByteString): Option[BigInt] = ???
override def getChainWeightByHash(blockhash: ByteString): Option[ChainWeight] = ???

override def getEvmCodeByHash(hash: ByteString): Option[ByteString] = ???

Expand Down Expand Up @@ -194,7 +194,7 @@ class BlockchainMock(genesisHash: ByteString) extends Blockchain {

def getBestBlock(): Block = ???

override def save(block: Block, receipts: Seq[Receipt], totalDifficulty: BigInt, saveAsBestBlock: Boolean): Unit = ???
override def save(block: Block, receipts: Seq[Receipt], weight: ChainWeight, saveAsBestBlock: Boolean): Unit = ???

override def getStateStorage: StateStorage = ???

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object FixtureProvider {
override val blockHeadersStorage: BlockHeadersStorage = new BlockHeadersStorage(dataSource)
override val blockNumberMappingStorage: BlockNumberMappingStorage = new BlockNumberMappingStorage(dataSource)
override val blockBodiesStorage: BlockBodiesStorage = new BlockBodiesStorage(dataSource)
override val totalDifficultyStorage: TotalDifficultyStorage = new TotalDifficultyStorage(dataSource)
override val chainWeightStorage: ChainWeightStorage = new ChainWeightStorage(dataSource)
override val transactionMappingStorage: TransactionMappingStorage = new TransactionMappingStorage(dataSource)
override val nodeStorage: NodeStorage = new NodeStorage(dataSource)
override val cachedNodeStorage: CachedNodeStorage = new CachedNodeStorage(nodeStorage, caches.nodeCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ class GenesisDataLoader(blockchain: Blockchain, blockchainConfig: BlockchainConf
case None =>
storage.persist()
stateStorage.forcePersist(GenesisDataLoad)
blockchain.save(Block(header, BlockBody(Nil, Nil)), Nil, header.difficulty, saveAsBestBlock = true)
blockchain.save(
Block(header, BlockBody(Nil, Nil)),
Nil,
ChainWeight.totalDifficultyOnly(header.difficulty),
saveAsBestBlock = true
)
Success(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class BlockBroadcast(val etcPeerManager: ActorRef, syncConfig: SyncConfig) {

private def shouldSendNewBlock(newBlock: NewBlock, peerInfo: PeerInfo): Boolean =
newBlock.block.header.number > peerInfo.maxBlockNumber ||
newBlock.totalDifficulty > peerInfo.totalDifficulty ||
newBlock.latestCheckpointNumber > peerInfo.latestCheckpointNumber
newBlock.chainWeight > peerInfo.chainWeight

private def broadcastNewBlock(newBlock: NewBlock, peers: Set[Peer]): Unit =
obtainRandomPeerSubset(peers).foreach { peer =>
Expand Down
27 changes: 15 additions & 12 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ class FastSync(
val header = headers.head
processHeader(header, peer) match {
case Left(result) => result
case Right(headerAndDif) =>
updateSyncState(headerAndDif._1, headerAndDif._2)
case Right((header, weight)) =>
updateSyncState(header, weight)
if (header.number == syncState.safeDownloadTarget) {
ImportedPivotBlock
} else {
Expand Down Expand Up @@ -371,10 +371,10 @@ class FastSync(
}
}

private def updateSyncState(header: BlockHeader, parentTd: BigInt): Unit = {
private def updateSyncState(header: BlockHeader, parentWeight: ChainWeight): Unit = {
blockchain
.storeBlockHeader(header)
.and(blockchain.storeTotalDifficulty(header.hash, parentTd + header.difficulty))
.and(blockchain.storeChainWeight(header.hash, parentWeight.increase(header)))
.commit()

if (header.number > syncState.bestBlockHeaderNumber) {
Expand All @@ -391,14 +391,17 @@ class FastSync(
syncState = syncState.updateNextBlockToValidate(header, K, X)
}

private def processHeader(header: BlockHeader, peer: Peer): Either[HeaderProcessingResult, (BlockHeader, BigInt)] =
private def processHeader(
header: BlockHeader,
peer: Peer
): Either[HeaderProcessingResult, (BlockHeader, ChainWeight)] =
for {
validatedHeader <- validateHeader(header, peer)
parentDifficulty <- getParentDifficulty(header)
} yield (validatedHeader, parentDifficulty)
parentWeight <- getParentChainWeight(header)
} yield (validatedHeader, parentWeight)

private def getParentDifficulty(header: BlockHeader) = {
blockchain.getTotalDifficultyByHash(header.parentHash).toRight(ParentDifficultyNotFound(header))
private def getParentChainWeight(header: BlockHeader) = {
blockchain.getChainWeightByHash(header.parentHash).toRight(ParentChainWeightNotFound(header))
}

private def handleRewind(header: BlockHeader, peer: Peer, N: Int): Unit = {
Expand All @@ -419,11 +422,11 @@ class FastSync(
private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = {
if (checkHeadersChain(headers)) {
processHeaders(peer, headers) match {
case ParentDifficultyNotFound(header) =>
case ParentChainWeightNotFound(header) =>
// We could end in wrong fork and get blocked so we should rewind our state a little
// we blacklist peer just in case we got malicious peer which would send us bad blocks, forcing us to rollback
// to genesis
log.warning("Parent difficulty not found for block {}, not processing rest of headers", header.idTag)
log.warning("Parent chain weight not found for block {}, not processing rest of headers", header.idTag)
handleRewind(header, peer, syncConfig.fastSyncBlockValidationN)
case HeadersProcessingFinished =>
processSyncing()
Expand Down Expand Up @@ -890,7 +893,7 @@ object FastSync {

sealed abstract class HeaderProcessingResult
case object HeadersProcessingFinished extends HeaderProcessingResult
case class ParentDifficultyNotFound(header: BlockHeader) extends HeaderProcessingResult
case class ParentChainWeightNotFound(header: BlockHeader) extends HeaderProcessingResult
case class ValidationFailed(header: BlockHeader, peer: Peer) extends HeaderProcessingResult
case object ImportedPivotBlock extends HeaderProcessingResult

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ object PeersClient {

def bestPeer(peersToDownloadFrom: Map[Peer, PeerInfo]): Option[Peer] = {
val peersToUse = peersToDownloadFrom
.collect { case (ref, PeerInfo(_, totalDifficulty, latestChkp, true, _, _)) =>
(ref, totalDifficulty, latestChkp)
.collect { case (ref, PeerInfo(_, chainWeight, true, _, _)) =>
(ref, chainWeight)
}

if (peersToUse.nonEmpty) {
val (peer, _, _) = peersToUse.maxBy { case (_, td, latestChkp) => latestChkp -> td }
val (peer, _) = peersToUse.maxBy(_._2)
Some(peer)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ class PivotBlockSelector(
}

private def collectVoters: ElectionDetails = {
val peersUsedToChooseTarget = peersToDownloadFrom.collect {
case (peer, PeerInfo(_, _, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
val peersUsedToChooseTarget = peersToDownloadFrom.collect { case (peer, PeerInfo(_, _, true, maxBlockNumber, _)) =>
(peer, maxBlockNumber)
}

val peersSortedByBestNumber = peersUsedToChooseTarget.toList.sortBy { case (_, number) => -number }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class SyncStateSchedulerActor(
case Some((startSignal: StartSyncingTo, sender)) =>
val initStats = ProcessingStatistics().addSaved(result.writtenElements)
startSyncing(startSignal.stateRoot, startSignal.blockNumber, initStats, sender)
case Some((restartSignal: RestartRequested.type, sender)) =>
case Some((RestartRequested, sender)) =>
// TODO: are we testing this path?
sender ! WaitingForNewTargetBlock
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class BlockFetcher(
supervisor ! ProgressProtocol.GotNewBlock(newState.knownTop)

fetchBlocks(newState)
case MessageFromPeer(NewBlock(block, _, _), peerId) =>
case MessageFromPeer(NewBlock(_, block, _), peerId) =>
val newBlockNr = block.number
val nextExpectedBlock = state.lastFullBlockNumber + 1

Expand Down
Loading