Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a4217e5
[ETCM-213] Expose db iterator over key-vals
KonradStaniec Oct 15, 2020
a6258da
[ETCM-213] Update SyncStateActor to load bloom filter at start
KonradStaniec Oct 16, 2020
1bdc6b9
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 16, 2020
116782f
[ETCM-213] Properly handle restart while loading bloom filter
KonradStaniec Oct 19, 2020
524a463
[ETCM-213] Extract bloom loading to sparate class. More tests.
KonradStaniec Oct 19, 2020
519f4d6
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 20, 2020
54e75c1
[ETCM-213] Fix scalafmt
KonradStaniec Oct 20, 2020
e9833d1
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 20, 2020
e4029e3
[ETCM-213] Refactor iterator impl
KonradStaniec Oct 21, 2020
cc0cd99
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 21, 2020
6642568
[ETCM-213] Pr comments
KonradStaniec Oct 22, 2020
5d37321
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 22, 2020
6e7bb45
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 23, 2020
29ca388
[ETCM-213] Fix merge conflicts
KonradStaniec Oct 23, 2020
ce8a83f
[ETCM-213] Add one todo regarding async processing
KonradStaniec Oct 23, 2020
f46fd50
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 23, 2020
1667f33
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions src/it/scala/io/iohk/ethereum/db/RockDbIteratorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package io.iohk.ethereum.db

import java.nio.file.Files

import akka.util.ByteString
import cats.effect.Resource
import cats.effect.concurrent.{Deferred, Ref}
import io.iohk.ethereum.db.dataSource.{DataSourceUpdateOptimized, RocksDbConfig, RocksDbDataSource}
import io.iohk.ethereum.db.storage.{EvmCodeStorage, Namespaces, NodeStorage}
import io.iohk.ethereum.{FlatSpecBase, ResourceFixtures}
import monix.eval.Task
import monix.reactive.{Consumer, Observable}
import org.scalatest.matchers.should.Matchers

import scala.util.Random

class RockDbIteratorSpec extends FlatSpecBase with ResourceFixtures with Matchers {
type Fixture = RocksDbDataSource

override def fixtureResource: Resource[Task, RocksDbDataSource] = RockDbIteratorSpec.buildRockDbResource()

def genRandomArray(): Array[Byte] = {
val arr = new Array[Byte](32)
Random.nextBytes(arr)
arr
}

def genRandomByteString(): ByteString = {
ByteString.fromArrayUnsafe(genRandomArray())
}

def writeNValuesToDb(n: Int, db: RocksDbDataSource, namespace: IndexedSeq[Byte]): Task[Unit] = {
val iterable = (0 until n)
Observable.fromIterable(iterable).foreachL { _ =>
db.update(Seq(DataSourceUpdateOptimized(namespace, Seq(), Seq((genRandomArray(), genRandomArray())))))
}
}

it should "cancel ongoing iteration" in testCaseT { db =>
val largeNum = 1000000
val finishMark = 20000
for {
counter <- Ref.of[Task, Int](0)
cancelMark <- Deferred[Task, Unit]
_ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace)
fib <- db
.iterate(Namespaces.NodeNamespace)
.map(_.right.get)
.consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ =>
for {
cur <- counter.updateAndGet(i => i + 1)
_ <- if (cur == finishMark) cancelMark.complete(()) else Task.unit
} yield ()
})
.start
_ <- cancelMark.get
// take in mind this test also check if all underlying rocksdb resources has been cleaned as if cancel
// would not close underlying DbIterator, whole test would kill jvm due to rocksdb error at native level because
// iterators needs to be closed before closing db.
_ <- fib.cancel
finalCounter <- counter.get
} yield {
assert(finalCounter < largeNum)
}
}

it should "read all key values in db" in testCaseT { db =>
val largeNum = 100000
for {
counter <- Ref.of[Task, Int](0)
_ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace)
_ <- db
.iterate(Namespaces.NodeNamespace)
.map(_.right.get)
.consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ =>
counter.update(current => current + 1)
})
finalCounter <- counter.get
} yield {
assert(finalCounter == largeNum)
}
}

it should "iterate over keys and values from different namespaces" in testCaseT { db =>
val codeStorage = new EvmCodeStorage(db)
val codeKeyValues = (1 to 10).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList

val nodeStorage = new NodeStorage(db)
val nodeKeyValues = (20 to 30).map(i => (ByteString(i.toByte), ByteString(i.toByte).toArray)).toList

for {
_ <- Task(codeStorage.update(Seq(), codeKeyValues).commit())
_ <- Task(nodeStorage.update(Seq(), nodeKeyValues))
result <- Task.parZip2(
codeStorage.storageContent.map(_.right.get).map(_._1).toListL,
nodeStorage.storageContent.map(_.right.get).map(_._1).toListL
)
(codeResult, nodeResult) = result
} yield {
codeResult shouldEqual codeKeyValues.map(_._1)
nodeResult shouldEqual nodeKeyValues.map(_._1)
}
}

it should "iterate over keys and values " in testCaseT { db =>
val keyValues = (1 to 100).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList
for {
_ <- Task(
db.update(
Seq(
DataSourceUpdateOptimized(Namespaces.NodeNamespace, Seq(), keyValues.map(e => (e._1.toArray, e._2.toArray)))
)
)
)
elems <- db.iterate(Namespaces.NodeNamespace).map(_.right.get).toListL
} yield {
val deserialized = elems.map { case (bytes, bytes1) => (ByteString(bytes), ByteString(bytes1)) }
assert(elems.size == keyValues.size)
assert(keyValues == deserialized)
}
}

it should "return empty list when iterating empty db" in testCaseT { db =>
for {
elems <- db.iterate().toListL
} yield {
assert(elems.isEmpty)
}
}
}

object RockDbIteratorSpec {
def getRockDbTestConfig(dbPath: String) = {
new RocksDbConfig {
override val createIfMissing: Boolean = true
override val paranoidChecks: Boolean = false
override val path: String = dbPath
override val maxThreads: Int = 1
override val maxOpenFiles: Int = 32
override val verifyChecksums: Boolean = false
override val levelCompaction: Boolean = true
override val blockSize: Long = 16384
override val blockCacheSize: Long = 33554432
}
}

def buildRockDbResource(): Resource[Task, RocksDbDataSource] = {
Resource.make {
Task {
val tempDir = Files.createTempDirectory("temp-iter-dir")
RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq)
}
}(db => Task(db.destroy()))
}
}
31 changes: 27 additions & 4 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,31 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {
}
}

it should "sync state to peer from partially synced state" in customTestCaseResourceM(
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
for {
_ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500))
_ <- peer2.importBlocksUntil(3000)(updateStateAtBlock(2500, 1000, 2000))
_ <- peer1.importBlocksUntil(2000)(updateStateAtBlock(1500))
_ <- peer1.startWithState()
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
}
}
}

object FastSyncItSpec {

def updateWorldWithNAccounts(n: Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
def updateWorldWithAccounts(
startAccount: Int,
endAccount: Int,
world: InMemoryWorldStateProxy
): InMemoryWorldStateProxy = {
val resultWorld = (startAccount until endAccount).foldLeft(world) { (world, num) =>
val randomBalance = num
val randomAddress = Address(num)
val codeBytes = BigInt(num).toByteArray
Expand All @@ -152,10 +171,14 @@ object FastSyncItSpec {
InMemoryWorldStateProxy.persistState(resultWorld)
}

def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
def updateStateAtBlock(
blockWithUpdate: BigInt,
startAccount: Int = 0,
endAccount: Int = 1000
): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
(blockNr: BigInt, world: InMemoryWorldStateProxy) =>
if (blockNr == blockWithUpdate) {
updateWorldWithNAccounts(1000, world)
updateWorldWithAccounts(startAccount, endAccount, world)
} else {
IdentityUpdate(blockNr, world)
}
Expand Down
12 changes: 12 additions & 0 deletions src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.FastSync
import io.iohk.ethereum.blockchain.sync.FastSync.SyncState
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain.Address
import io.iohk.ethereum.mpt.{HashNode, MptNode, MptTraversals}
Expand Down Expand Up @@ -89,6 +90,17 @@ object FastSyncItSpecUtils {

go(0)
}

def startWithState(): Task[Unit] = {
Task {
val currentBest = bl.getBestBlock().header
val safeTarget = currentBest.number + syncConfig.fastSyncBlockValidationX
val nextToValidate = currentBest.number + 1
val syncState = SyncState(currentBest, safeTarget, Seq(), Seq(), 0, 0, currentBest.number, nextToValidate)
storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState)
}.map(_ => ())
}

}

object FakePeer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, Secu
import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
import java.util.concurrent.atomic.AtomicReference

import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate
import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource}
import org.bouncycastle.util.encoders.Hex

import scala.concurrent.duration._
import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty
import io.iohk.ethereum.network.discovery.DiscoveryConfig
import monix.reactive.Observable

object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder with AuthHandshakerBuilder {
val conf = ConfigFactory.load("txExecTest/chainDump.conf")
Expand Down Expand Up @@ -197,4 +198,6 @@ class BlockchainMock(genesisHash: ByteString) extends Blockchain {
override def getStateStorage: StateStorage = ???

override def getLatestCheckpointBlockNumber(): BigInt = ???

override def mptStateSavedKeys(): Observable[Either[RocksDbDataSource.IterationError, NodeHash]] = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.iohk.ethereum.blockchain.sync

import com.google.common.hash.{BloomFilter, Funnel}
import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError
import monix.eval.Task
import monix.reactive.{Consumer, Observable}

class LoadableBloomFilter[A](bloomFilter: BloomFilter[A], source: Observable[Either[IterationError, A]]) {
val loadFromSource: Task[BloomFilterLoadingResult] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this setup TBH. If underlying bloom filter can be used only once loading completes - let's make a function which takes a source and returns task with ready-to-use bloom filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now we are using underlying bloom filter that way (first load then use), as it is simplest possible scenario and loading time for etc is relatively shot i.e 2-3 min.

As underlying bloom filter is thread safe, we could probably start loading filter from storage and at the same time start state sync, but i would do this as separate task. (It could be especially useful if we were to support ETH when loading nodes would take probably more that 10min which is probably not acceptable)

That why i would leave it as it is.

source
.consumeWith(Consumer.foldLeftTask(BloomFilterLoadingResult()) { (s, e) =>
e match {
case Left(value) => Task.now(s.copy(error = Some(value)))
case Right(value) => Task(bloomFilter.put(value)).map(_ => s.copy(writtenElements = s.writtenElements + 1))
}
})
.memoizeOnSuccess
}

def put(elem: A): Boolean = bloomFilter.put(elem)

def mightContain(elem: A): Boolean = bloomFilter.mightContain(elem)

def approximateElementCount: Long = bloomFilter.approximateElementCount()
}

object LoadableBloomFilter {
def apply[A](expectedSize: Int, loadingSource: Observable[Either[IterationError, A]])(implicit
f: Funnel[A]
): LoadableBloomFilter[A] = {
new LoadableBloomFilter[A](BloomFilter.create[A](f, expectedSize), loadingSource)
}

case class BloomFilterLoadingResult(writtenElements: Long, error: Option[IterationError])
object BloomFilterLoadingResult {
def apply(): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, None)

def apply(ex: Throwable): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, Some(IterationError(ex)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import java.util.Comparator

import akka.util.ByteString
import com.google.common.hash.{BloomFilter, Funnel, PrimitiveSink}
import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult
import io.iohk.ethereum.blockchain.sync.SyncStateScheduler._
import io.iohk.ethereum.domain.{Account, Blockchain}
import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode}
import io.vavr.collection.PriorityQueue
import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._
import monix.eval.Task

import scala.annotation.tailrec
import scala.util.Try
Expand Down Expand Up @@ -40,7 +42,9 @@ import scala.util.Try
*
* Important part is that nodes retrieved by getMissingNodes, must eventually be provided for scheduler to make progress
*/
class SyncStateScheduler(blockchain: Blockchain, bloomFilter: BloomFilter[ByteString]) {
class SyncStateScheduler(blockchain: Blockchain, bloomFilter: LoadableBloomFilter[ByteString]) {

val loadFilterFromBlockchain: Task[BloomFilterLoadingResult] = bloomFilter.loadFromSource

def initState(targetRootHash: ByteString): Option[SchedulerState] = {
if (targetRootHash == emptyStateRootHash) {
Expand Down Expand Up @@ -268,7 +272,7 @@ object SyncStateScheduler {

case object StorageNode extends NodeRequest

object ByteStringFunnel extends Funnel[ByteString] {
implicit object ByteStringFunnel extends Funnel[ByteString] {
override def funnel(from: ByteString, into: PrimitiveSink): Unit = {
into.putBytes(from.toArray)
}
Expand All @@ -277,10 +281,14 @@ object SyncStateScheduler {
def getEmptyFilter(expectedFilterSize: Int): BloomFilter[ByteString] = {
BloomFilter.create[ByteString](ByteStringFunnel, expectedFilterSize)
}
// TODO [ETCM-213] add method to load bloom filter after node restart. Perfect way to do it would be to expose Observable
// in RocksDBDataSource which underneath would use RockDbIterator which would traverse whole namespace.

def apply(blockchain: Blockchain, expectedBloomFilterSize: Int): SyncStateScheduler = {
new SyncStateScheduler(blockchain, getEmptyFilter(expectedBloomFilterSize))
// provided source i.e mptStateSavedKeys() is guaranteed to finish on first `Left` element which means that returned
// error is the reason why loading has stopped
new SyncStateScheduler(
blockchain,
LoadableBloomFilter[ByteString](expectedBloomFilterSize, blockchain.mptStateSavedKeys())
)
}

final case class StateNodeRequest(
Expand Down Expand Up @@ -463,5 +471,4 @@ object SyncStateScheduler {
object ProcessingStatistics {
def apply(): ProcessingStatistics = new ProcessingStatistics(0, 0, 0)
}

}
Loading