diff --git a/src/it/scala/io/iohk/ethereum/db/RockDbIteratorSpec.scala b/src/it/scala/io/iohk/ethereum/db/RockDbIteratorSpec.scala new file mode 100644 index 0000000000..50e3055279 --- /dev/null +++ b/src/it/scala/io/iohk/ethereum/db/RockDbIteratorSpec.scala @@ -0,0 +1,155 @@ +package io.iohk.ethereum.db + +import java.nio.file.Files + +import akka.util.ByteString +import cats.effect.Resource +import cats.effect.concurrent.{Deferred, Ref} +import io.iohk.ethereum.db.dataSource.{DataSourceUpdateOptimized, RocksDbConfig, RocksDbDataSource} +import io.iohk.ethereum.db.storage.{EvmCodeStorage, Namespaces, NodeStorage} +import io.iohk.ethereum.{FlatSpecBase, ResourceFixtures} +import monix.eval.Task +import monix.reactive.{Consumer, Observable} +import org.scalatest.matchers.should.Matchers + +import scala.util.Random + +class RockDbIteratorSpec extends FlatSpecBase with ResourceFixtures with Matchers { + type Fixture = RocksDbDataSource + + override def fixtureResource: Resource[Task, RocksDbDataSource] = RockDbIteratorSpec.buildRockDbResource() + + def genRandomArray(): Array[Byte] = { + val arr = new Array[Byte](32) + Random.nextBytes(arr) + arr + } + + def genRandomByteString(): ByteString = { + ByteString.fromArrayUnsafe(genRandomArray()) + } + + def writeNValuesToDb(n: Int, db: RocksDbDataSource, namespace: IndexedSeq[Byte]): Task[Unit] = { + val iterable = (0 until n) + Observable.fromIterable(iterable).foreachL { _ => + db.update(Seq(DataSourceUpdateOptimized(namespace, Seq(), Seq((genRandomArray(), genRandomArray()))))) + } + } + + it should "cancel ongoing iteration" in testCaseT { db => + val largeNum = 1000000 + val finishMark = 20000 + for { + counter <- Ref.of[Task, Int](0) + cancelMark <- Deferred[Task, Unit] + _ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace) + fib <- db + .iterate(Namespaces.NodeNamespace) + .map(_.right.get) + .consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ => + for { + cur <- counter.updateAndGet(i => i + 1) + _ <- if (cur == finishMark) cancelMark.complete(()) else Task.unit + } yield () + }) + .start + _ <- cancelMark.get + // take in mind this test also check if all underlying rocksdb resources has been cleaned as if cancel + // would not close underlying DbIterator, whole test would kill jvm due to rocksdb error at native level because + // iterators needs to be closed before closing db. + _ <- fib.cancel + finalCounter <- counter.get + } yield { + assert(finalCounter < largeNum) + } + } + + it should "read all key values in db" in testCaseT { db => + val largeNum = 100000 + for { + counter <- Ref.of[Task, Int](0) + _ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace) + _ <- db + .iterate(Namespaces.NodeNamespace) + .map(_.right.get) + .consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ => + counter.update(current => current + 1) + }) + finalCounter <- counter.get + } yield { + assert(finalCounter == largeNum) + } + } + + it should "iterate over keys and values from different namespaces" in testCaseT { db => + val codeStorage = new EvmCodeStorage(db) + val codeKeyValues = (1 to 10).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList + + val nodeStorage = new NodeStorage(db) + val nodeKeyValues = (20 to 30).map(i => (ByteString(i.toByte), ByteString(i.toByte).toArray)).toList + + for { + _ <- Task(codeStorage.update(Seq(), codeKeyValues).commit()) + _ <- Task(nodeStorage.update(Seq(), nodeKeyValues)) + result <- Task.parZip2( + codeStorage.storageContent.map(_.right.get).map(_._1).toListL, + nodeStorage.storageContent.map(_.right.get).map(_._1).toListL + ) + (codeResult, nodeResult) = result + } yield { + codeResult shouldEqual codeKeyValues.map(_._1) + nodeResult shouldEqual nodeKeyValues.map(_._1) + } + } + + it should "iterate over keys and values " in testCaseT { db => + val keyValues = (1 to 100).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList + for { + _ <- Task( + db.update( + Seq( + DataSourceUpdateOptimized(Namespaces.NodeNamespace, Seq(), keyValues.map(e => (e._1.toArray, e._2.toArray))) + ) + ) + ) + elems <- db.iterate(Namespaces.NodeNamespace).map(_.right.get).toListL + } yield { + val deserialized = elems.map { case (bytes, bytes1) => (ByteString(bytes), ByteString(bytes1)) } + assert(elems.size == keyValues.size) + assert(keyValues == deserialized) + } + } + + it should "return empty list when iterating empty db" in testCaseT { db => + for { + elems <- db.iterate().toListL + } yield { + assert(elems.isEmpty) + } + } +} + +object RockDbIteratorSpec { + def getRockDbTestConfig(dbPath: String) = { + new RocksDbConfig { + override val createIfMissing: Boolean = true + override val paranoidChecks: Boolean = false + override val path: String = dbPath + override val maxThreads: Int = 1 + override val maxOpenFiles: Int = 32 + override val verifyChecksums: Boolean = false + override val levelCompaction: Boolean = true + override val blockSize: Long = 16384 + override val blockCacheSize: Long = 33554432 + } + } + + def buildRockDbResource(): Resource[Task, RocksDbDataSource] = { + Resource.make { + Task { + val tempDir = Files.createTempDirectory("temp-iter-dir") + RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq) + } + }(db => Task(db.destroy())) + } +} diff --git a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala index 3937a59aed..38cb7ea116 100644 --- a/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala @@ -133,12 +133,31 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll { } } + it should "sync state to peer from partially synced state" in customTestCaseResourceM( + FakePeer.start2FakePeersRes() + ) { case (peer1, peer2) => + for { + _ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500)) + _ <- peer2.importBlocksUntil(3000)(updateStateAtBlock(2500, 1000, 2000)) + _ <- peer1.importBlocksUntil(2000)(updateStateAtBlock(1500)) + _ <- peer1.startWithState() + _ <- peer1.connectToPeers(Set(peer2.node)) + _ <- peer1.startFastSync().delayExecution(50.milliseconds) + _ <- peer1.waitForFastSyncFinish() + } yield { + assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset) + } + } } object FastSyncItSpec { - def updateWorldWithNAccounts(n: Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = { - val resultWorld = (0 until n).foldLeft(world) { (world, num) => + def updateWorldWithAccounts( + startAccount: Int, + endAccount: Int, + world: InMemoryWorldStateProxy + ): InMemoryWorldStateProxy = { + val resultWorld = (startAccount until endAccount).foldLeft(world) { (world, num) => val randomBalance = num val randomAddress = Address(num) val codeBytes = BigInt(num).toByteArray @@ -152,10 +171,14 @@ object FastSyncItSpec { InMemoryWorldStateProxy.persistState(resultWorld) } - def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { + def updateStateAtBlock( + blockWithUpdate: BigInt, + startAccount: Int = 0, + endAccount: Int = 1000 + ): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = { (blockNr: BigInt, world: InMemoryWorldStateProxy) => if (blockNr == blockWithUpdate) { - updateWorldWithNAccounts(1000, world) + updateWorldWithAccounts(startAccount, endAccount, world) } else { IdentityUpdate(blockNr, world) } diff --git a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala index 86fcf730ce..26d14a318b 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala @@ -4,6 +4,7 @@ import akka.util.ByteString import cats.effect.Resource import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed import io.iohk.ethereum.blockchain.sync.FastSync +import io.iohk.ethereum.blockchain.sync.FastSync.SyncState import io.iohk.ethereum.crypto.kec256 import io.iohk.ethereum.domain.Address import io.iohk.ethereum.mpt.{HashNode, MptNode, MptTraversals} @@ -89,6 +90,17 @@ object FastSyncItSpecUtils { go(0) } + + def startWithState(): Task[Unit] = { + Task { + val currentBest = bl.getBestBlock().header + val safeTarget = currentBest.number + syncConfig.fastSyncBlockValidationX + val nextToValidate = currentBest.number + 1 + val syncState = SyncState(currentBest, safeTarget, Seq(), Seq(), 0, 0, currentBest.number, nextToValidate) + storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState) + }.map(_ => ()) + } + } object FakePeer { diff --git a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala index 4492959d4a..e542a778e8 100644 --- a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala +++ b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala @@ -22,12 +22,13 @@ import io.iohk.ethereum.nodebuilder.{AuthHandshakerBuilder, NodeKeyBuilder, Secu import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus} import java.util.concurrent.atomic.AtomicReference -import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate +import io.iohk.ethereum.db.dataSource.{DataSourceBatchUpdate, RocksDbDataSource} import org.bouncycastle.util.encoders.Hex import scala.concurrent.duration._ import io.iohk.ethereum.domain.BlockHeader.HeaderExtraFields.HefEmpty import io.iohk.ethereum.network.discovery.DiscoveryConfig +import monix.reactive.Observable object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder with AuthHandshakerBuilder { val conf = ConfigFactory.load("txExecTest/chainDump.conf") @@ -197,4 +198,6 @@ class BlockchainMock(genesisHash: ByteString) extends Blockchain { override def getStateStorage: StateStorage = ??? override def getLatestCheckpointBlockNumber(): BigInt = ??? + + override def mptStateSavedKeys(): Observable[Either[RocksDbDataSource.IterationError, NodeHash]] = ??? } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilter.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilter.scala new file mode 100644 index 0000000000..0b0e1426ce --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilter.scala @@ -0,0 +1,41 @@ +package io.iohk.ethereum.blockchain.sync + +import com.google.common.hash.{BloomFilter, Funnel} +import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError +import monix.eval.Task +import monix.reactive.{Consumer, Observable} + +class LoadableBloomFilter[A](bloomFilter: BloomFilter[A], source: Observable[Either[IterationError, A]]) { + val loadFromSource: Task[BloomFilterLoadingResult] = { + source + .consumeWith(Consumer.foldLeftTask(BloomFilterLoadingResult()) { (s, e) => + e match { + case Left(value) => Task.now(s.copy(error = Some(value))) + case Right(value) => Task(bloomFilter.put(value)).map(_ => s.copy(writtenElements = s.writtenElements + 1)) + } + }) + .memoizeOnSuccess + } + + def put(elem: A): Boolean = bloomFilter.put(elem) + + def mightContain(elem: A): Boolean = bloomFilter.mightContain(elem) + + def approximateElementCount: Long = bloomFilter.approximateElementCount() +} + +object LoadableBloomFilter { + def apply[A](expectedSize: Int, loadingSource: Observable[Either[IterationError, A]])(implicit + f: Funnel[A] + ): LoadableBloomFilter[A] = { + new LoadableBloomFilter[A](BloomFilter.create[A](f, expectedSize), loadingSource) + } + + case class BloomFilterLoadingResult(writtenElements: Long, error: Option[IterationError]) + object BloomFilterLoadingResult { + def apply(): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, None) + + def apply(ex: Throwable): BloomFilterLoadingResult = new BloomFilterLoadingResult(0, Some(IterationError(ex))) + } +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala index ba9fedfbd4..8fabea7ccc 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateScheduler.scala @@ -4,11 +4,13 @@ import java.util.Comparator import akka.util.ByteString import com.google.common.hash.{BloomFilter, Funnel, PrimitiveSink} +import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult import io.iohk.ethereum.blockchain.sync.SyncStateScheduler._ import io.iohk.ethereum.domain.{Account, Blockchain} import io.iohk.ethereum.mpt.{BranchNode, ExtensionNode, HashNode, LeafNode, MerklePatriciaTrie, MptNode} import io.vavr.collection.PriorityQueue import io.iohk.ethereum.network.p2p.messages.PV63.MptNodeEncoders._ +import monix.eval.Task import scala.annotation.tailrec import scala.util.Try @@ -40,7 +42,9 @@ import scala.util.Try * * Important part is that nodes retrieved by getMissingNodes, must eventually be provided for scheduler to make progress */ -class SyncStateScheduler(blockchain: Blockchain, bloomFilter: BloomFilter[ByteString]) { +class SyncStateScheduler(blockchain: Blockchain, bloomFilter: LoadableBloomFilter[ByteString]) { + + val loadFilterFromBlockchain: Task[BloomFilterLoadingResult] = bloomFilter.loadFromSource def initState(targetRootHash: ByteString): Option[SchedulerState] = { if (targetRootHash == emptyStateRootHash) { @@ -268,7 +272,7 @@ object SyncStateScheduler { case object StorageNode extends NodeRequest - object ByteStringFunnel extends Funnel[ByteString] { + implicit object ByteStringFunnel extends Funnel[ByteString] { override def funnel(from: ByteString, into: PrimitiveSink): Unit = { into.putBytes(from.toArray) } @@ -277,10 +281,14 @@ object SyncStateScheduler { def getEmptyFilter(expectedFilterSize: Int): BloomFilter[ByteString] = { BloomFilter.create[ByteString](ByteStringFunnel, expectedFilterSize) } - // TODO [ETCM-213] add method to load bloom filter after node restart. Perfect way to do it would be to expose Observable - // in RocksDBDataSource which underneath would use RockDbIterator which would traverse whole namespace. + def apply(blockchain: Blockchain, expectedBloomFilterSize: Int): SyncStateScheduler = { - new SyncStateScheduler(blockchain, getEmptyFilter(expectedBloomFilterSize)) + // provided source i.e mptStateSavedKeys() is guaranteed to finish on first `Left` element which means that returned + // error is the reason why loading has stopped + new SyncStateScheduler( + blockchain, + LoadableBloomFilter[ByteString](expectedBloomFilterSize, blockchain.mptStateSavedKeys()) + ) } final case class StateNodeRequest( @@ -463,5 +471,4 @@ object SyncStateScheduler { object ProcessingStatistics { def apply(): ProcessingStatistics = new ProcessingStatistics(0, 0, 0) } - } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala index 297869e84c..57c6803c68 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala @@ -2,9 +2,11 @@ package io.iohk.ethereum.blockchain.sync import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers} import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.LoadableBloomFilter.BloomFilterLoadingResult import io.iohk.ethereum.blockchain.sync.SyncStateDownloaderActor.{CancelDownload, RegisterScheduler} import io.iohk.ethereum.blockchain.sync.SyncStateScheduler.{ProcessingStatistics, SchedulerState, SyncResponse} import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{ + BloomFilterResult, GetMissingNodes, MissingNodes, PrintInfo, @@ -12,10 +14,12 @@ import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{ RestartRequested, StartSyncingTo, StateSyncFinished, + SyncStateSchedulerActorCommand, WaitingForNewTargetBlock } import io.iohk.ethereum.utils.ByteStringUtils import io.iohk.ethereum.utils.Config.SyncConfig +import monix.execution.Scheduler import scala.concurrent.duration._ @@ -23,16 +27,57 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy extends Actor with ActorLogging with Timers { + implicit val scheduler = Scheduler(context.dispatcher) + + val loadingCancelable = sync.loadFilterFromBlockchain.runAsync { + case Left(value) => + log.error( + "Unexpected error while loading bloom filter. Starting state sync with empty bloom filter" + + "which may result with degraded performance", + value + ) + self ! BloomFilterResult(BloomFilterLoadingResult()) + case Right(value) => + log.info("Bloom filter loading finished") + self ! BloomFilterResult(value) + } + + def waitingForBloomFilterToLoad(lastReceivedCommand: Option[(SyncStateSchedulerActorCommand, ActorRef)]): Receive = { + case BloomFilterResult(result) => + log.debug( + s"Loaded ${result.writtenElements} already known elements from storage to bloom filter the error while loading " + + s"was ${result.error}" + ) + lastReceivedCommand match { + case Some((startSignal: StartSyncingTo, sender)) => + val initStats = ProcessingStatistics().addSaved(result.writtenElements) + val initState = startSyncing(startSignal.stateRoot, startSignal.blockNumber) + context become (syncing(initState, initStats, startSignal.blockNumber, sender)) + case Some((restartSignal: RestartRequested.type, sender)) => + sender ! WaitingForNewTargetBlock + context.become(idle(ProcessingStatistics().addSaved(result.writtenElements))) + case _ => + context.become(idle(ProcessingStatistics().addSaved(result.writtenElements))) + } + + case command: SyncStateSchedulerActorCommand => + context.become(waitingForBloomFilterToLoad(Some((command, sender())))) + } + + private def startSyncing(root: ByteString, bn: BigInt): SchedulerState = { + timers.startTimerAtFixedRate(PrintInfoKey, PrintInfo, 30.seconds) + log.info(s"Starting state sync to root ${ByteStringUtils.hash2string(root)} on block ${bn}") + //TODO handle case when we already have root i.e state is synced up to this point + val initState = sync.initState(root).get + val (initialMissing, state1) = initState.getAllMissingHashes + downloader ! RegisterScheduler + downloader ! GetMissingNodes(initialMissing) + state1 + } def idle(processingStatistics: ProcessingStatistics): Receive = { case StartSyncingTo(root, bn) => - timers.startTimerAtFixedRate(PrintInfoKey, PrintInfo, 30.seconds) - log.info(s"Starting state sync to root ${ByteStringUtils.hash2string(root)} on block ${bn}") - //TODO handle case when we already have root i.e state is synced up to this point - val initState = sync.initState(root).get - val (initialMissing, state1) = initState.getAllMissingHashes - downloader ! RegisterScheduler - downloader ! GetMissingNodes(initialMissing) + val state1 = startSyncing(root, bn) context become (syncing(state1, processingStatistics, bn, sender())) case PrintInfo => log.info(s"Waiting for target block to start the state sync") @@ -62,7 +107,7 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy log.debug(s"Received {} new nodes to process", nodes.size) // Current SyncStateDownloaderActor makes sure that there is no not requested or duplicated values in its response. // so we can ignore those errors. - // TODO make processing async as sometimes downloader sits idle + //TODO [ETCM-275] process responses asynchronously to avoid steep rise of pending requests after pivot block update sync.processResponses(currentState, nodes) match { case Left(value) => log.error(s"Critical error while state syncing ${value}, stopping state sync") @@ -122,8 +167,12 @@ class SyncStateSchedulerActor(downloader: ActorRef, sync: SyncStateScheduler, sy context.become(idle(currentStats)) } - override def receive: Receive = idle(ProcessingStatistics()) + override def receive: Receive = waitingForBloomFilterToLoad(None) + override def postStop(): Unit = { + loadingCancelable.cancel() + super.postStop() + } } object SyncStateSchedulerActor { @@ -134,15 +183,17 @@ object SyncStateSchedulerActor { final case object PrintInfo final case object PrintInfoKey - case class StartSyncingTo(stateRoot: ByteString, blockNumber: BigInt) + sealed trait SyncStateSchedulerActorCommand + case class StartSyncingTo(stateRoot: ByteString, blockNumber: BigInt) extends SyncStateSchedulerActorCommand + case object RestartRequested extends SyncStateSchedulerActorCommand - case class GetMissingNodes(nodesToGet: List[ByteString]) + sealed trait SyncStateSchedulerActorResponse + case object StateSyncFinished extends SyncStateSchedulerActorResponse + case object WaitingForNewTargetBlock extends SyncStateSchedulerActorResponse + case class GetMissingNodes(nodesToGet: List[ByteString]) case class MissingNodes(missingNodes: List[SyncResponse], downloaderCapacity: Int) - case object StateSyncFinished - - case object RestartRequested - case object WaitingForNewTargetBlock + case class BloomFilterResult(res: BloomFilterLoadingResult) } diff --git a/src/main/scala/io/iohk/ethereum/db/dataSource/DataSource.scala b/src/main/scala/io/iohk/ethereum/db/dataSource/DataSource.scala index fbfb5a8712..4bdd76ebf2 100644 --- a/src/main/scala/io/iohk/ethereum/db/dataSource/DataSource.scala +++ b/src/main/scala/io/iohk/ethereum/db/dataSource/DataSource.scala @@ -1,5 +1,8 @@ package io.iohk.ethereum.db.dataSource +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError +import monix.reactive.Observable + trait DataSource { import DataSource._ @@ -29,7 +32,7 @@ trait DataSource { * @param key the key retrieve the value. * @return the value associated with the passed key. */ - def getOptimized(key: Array[Byte]): Option[Array[Byte]] + def getOptimized(namespace: Namespace, key: Array[Byte]): Option[Array[Byte]] /** * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs. @@ -51,6 +54,17 @@ trait DataSource { * This function closes the DataSource, if it is not yet closed, and deletes all the files used by it. */ def destroy(): Unit + + /** + * Return key-value pairs until first error or until whole db has been iterated + */ + def iterate(): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] + + /** + * Return key-value pairs until first error or until whole namespace has been iterated + */ + def iterate(namespace: Namespace): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] + } object DataSource { diff --git a/src/main/scala/io/iohk/ethereum/db/dataSource/DataSourceUpdate.scala b/src/main/scala/io/iohk/ethereum/db/dataSource/DataSourceUpdate.scala index 91c7e38a89..57af85c6cf 100644 --- a/src/main/scala/io/iohk/ethereum/db/dataSource/DataSourceUpdate.scala +++ b/src/main/scala/io/iohk/ethereum/db/dataSource/DataSourceUpdate.scala @@ -23,5 +23,8 @@ case class DataSourceUpdate(namespace: Namespace, toRemove: Seq[Key], toUpsert: * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource. * If a key is already in the DataSource its value will be updated. */ -case class DataSourceUpdateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]) - extends DataUpdate +case class DataSourceUpdateOptimized( + namespace: Namespace, + toRemove: Seq[Array[Byte]], + toUpsert: Seq[(Array[Byte], Array[Byte])] +) extends DataUpdate diff --git a/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala b/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala index 6269080ecf..d36e14b4d9 100644 --- a/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala +++ b/src/main/scala/io/iohk/ethereum/db/dataSource/EphemDataSource.scala @@ -1,7 +1,10 @@ package io.iohk.ethereum.db.dataSource import java.nio.ByteBuffer + import io.iohk.ethereum.db.dataSource.DataSource._ +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError +import monix.reactive.Observable class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSource { @@ -17,14 +20,16 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou storage.get(ByteBuffer.wrap((namespace ++ key).toArray)).map(_.toIndexedSeq) } - override def getOptimized(key: Array[Byte]): Option[Array[Byte]] = storage.get(ByteBuffer.wrap(key)) + override def getOptimized(namespace: Namespace, key: Array[Byte]): Option[Array[Byte]] = { + get(namespace, key.toIndexedSeq).map(_.toArray) + } override def update(dataSourceUpdates: Seq[DataUpdate]): Unit = synchronized { dataSourceUpdates.foreach { case DataSourceUpdate(namespace, toRemove, toUpsert) => update(namespace, toRemove, toUpsert) - case DataSourceUpdateOptimized(toRemove, toUpsert) => - updateOptimized(toRemove, toUpsert) + case DataSourceUpdateOptimized(namespace, toRemove, toUpsert) => + updateOptimized(namespace, toRemove, toUpsert) } } @@ -37,13 +42,13 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou storage = afterUpdate } - private def updateOptimized(toRemove: Seq[Array[Byte]], toUpsert: Seq[(Array[Byte], Array[Byte])]): Unit = - synchronized { - val afterRemoval = toRemove.foldLeft(storage)((storage, key) => storage - ByteBuffer.wrap(key)) - val afterUpdate = - toUpsert.foldLeft(afterRemoval)((storage, toUpdate) => storage + (ByteBuffer.wrap(toUpdate._1) -> toUpdate._2)) - storage = afterUpdate - } + private def updateOptimized( + namespace: Namespace, + toRemove: Seq[Array[Byte]], + toUpsert: Seq[(Array[Byte], Array[Byte])] + ): Unit = synchronized { + update(namespace, toRemove.map(_.toIndexedSeq), toUpsert.map(s => (s._1.toIndexedSeq, s._2.toIndexedSeq))) + } override def clear(): Unit = synchronized { storage = Map() @@ -52,6 +57,18 @@ class EphemDataSource(var storage: Map[ByteBuffer, Array[Byte]]) extends DataSou override def close(): Unit = () override def destroy(): Unit = () + + override def iterate(): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { + Observable.fromIterable(storage.toList.map { case (key, value) => Right(key.array(), value) }) + } + + override def iterate(namespace: Namespace): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { + val namespaceVals = storage collect { + case (buffer, bytes) if buffer.array().startsWith(namespace) => Right(buffer.array(), bytes) + } + + Observable.fromIterable(namespaceVals) + } } object EphemDataSource { diff --git a/src/main/scala/io/iohk/ethereum/db/dataSource/RocksDbDataSource.scala b/src/main/scala/io/iohk/ethereum/db/dataSource/RocksDbDataSource.scala index 8949f6f632..67b99c2211 100644 --- a/src/main/scala/io/iohk/ethereum/db/dataSource/RocksDbDataSource.scala +++ b/src/main/scala/io/iohk/ethereum/db/dataSource/RocksDbDataSource.scala @@ -1,8 +1,13 @@ package io.iohk.ethereum.db.dataSource import java.util.concurrent.locks.ReentrantReadWriteLock + +import cats.effect.Resource import io.iohk.ethereum.db.dataSource.DataSource._ +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.{IterationError, IterationFinished} import io.iohk.ethereum.utils.TryWithResources.withResources +import monix.eval.Task +import monix.reactive.Observable import org.rocksdb._ import org.slf4j.LoggerFactory @@ -53,11 +58,11 @@ class RocksDbDataSource( * @param key the key retrieve the value. * @return the value associated with the passed key. */ - override def getOptimized(key: Array[Byte]): Option[Array[Byte]] = { + override def getOptimized(namespace: Namespace, key: Array[Byte]): Option[Array[Byte]] = { assureNotClosed() RocksDbDataSource.dbLock.readLock().lock() try { - Option(db.get(readOptions, key)) + Option(db.get(handles(namespace), readOptions, key)) } catch { case NonFatal(e) => logger.error(s"Not found associated value to a key: $key, cause: {}", e.getMessage) @@ -80,11 +85,11 @@ class RocksDbDataSource( } toUpsert.foreach { case (k, v) => batch.put(handles(namespace), k.toArray, v.toArray) } - case DataSourceUpdateOptimized(toRemove, toUpsert) => + case DataSourceUpdateOptimized(namespace, toRemove, toUpsert) => toRemove.foreach { key => - batch.delete(key) + batch.delete(handles(namespace), key) } - toUpsert.foreach { case (k, v) => batch.put(k, v) } + toUpsert.foreach { case (k, v) => batch.put(handles(namespace), k, v) } } db.write(writeOptions, batch) } @@ -101,6 +106,38 @@ class RocksDbDataSource( } } + private def dbIterator: Resource[Task, RocksIterator] = { + Resource.fromAutoCloseable(Task(db.newIterator())) + } + + private def namespaceIterator(namespace: Namespace): Resource[Task, RocksIterator] = { + Resource.fromAutoCloseable(Task(db.newIterator(handles(namespace)))) + } + + private def moveIterator(it: RocksIterator): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { + Observable + .fromTask(Task(it.seekToFirst())) + .flatMap { _ => + Observable.repeatEvalF(for { + isValid <- Task(it.isValid) + item <- if (isValid) Task(Right(it.key(), it.value())) else Task.raiseError(IterationFinished) + _ <- Task(it.next()) + } yield item) + } + .onErrorHandleWith { + case IterationFinished => Observable.empty + case ex => Observable(Left(IterationError(ex))) + } + } + + def iterate(): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { + Observable.fromResource(dbIterator).flatMap(it => moveIterator(it)) + } + + def iterate(namespace: Namespace): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { + Observable.fromResource(namespaceIterator(namespace)).flatMap(it => moveIterator(it)) + } + /** * This function updates the DataSource by deleting all the (key-value) pairs in it. */ @@ -202,6 +239,8 @@ trait RocksDbConfig { } object RocksDbDataSource { + case object IterationFinished extends RuntimeException + case class IterationError(ex: Throwable) /** * The rocksdb implementation acquires a lock from the operating system to prevent misuse diff --git a/src/main/scala/io/iohk/ethereum/db/storage/AppStateStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/AppStateStorage.scala index d0f5ac444e..c33230fda1 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/AppStateStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/AppStateStorage.scala @@ -13,9 +13,12 @@ import io.iohk.ethereum.db.storage.AppStateStorage._ class AppStateStorage(val dataSource: DataSource) extends TransactionalKeyValueStorage[Key, Value] { val namespace: IndexedSeq[Byte] = Namespaces.AppStateNamespace - def keySerializer: Key => IndexedSeq[Byte] = _.getBytes - def valueSerializer: String => IndexedSeq[Byte] = _.getBytes - def valueDeserializer: IndexedSeq[Byte] => String = (valueBytes: IndexedSeq[Byte]) => new String(valueBytes.toArray) + def keySerializer: Key => IndexedSeq[Byte] = _.getBytes(StorageStringCharset.UTF8Charset) + + def keyDeserializer: IndexedSeq[Byte] => Key = k => new String(k.toArray, StorageStringCharset.UTF8Charset) + def valueSerializer: String => IndexedSeq[Byte] = _.getBytes(StorageStringCharset.UTF8Charset) + def valueDeserializer: IndexedSeq[Byte] => String = (valueBytes: IndexedSeq[Byte]) => + new String(valueBytes.toArray, StorageStringCharset.UTF8Charset) def getBestBlockNumber(): BigInt = getBigInt(Keys.BestBlockNumber) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/BlockBodiesStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/BlockBodiesStorage.scala index 0559f426d5..2e3f7aea70 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/BlockBodiesStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/BlockBodiesStorage.scala @@ -22,6 +22,8 @@ class BlockBodiesStorage(val dataSource: DataSource) extends TransactionalKeyVal override def keySerializer: BlockBodyHash => IndexedSeq[Byte] = _.toIndexedSeq + override def keyDeserializer: IndexedSeq[Byte] => BlockBodyHash = k => ByteString.fromArrayUnsafe(k.toArray) + override def valueSerializer: BlockBody => IndexedSeq[Byte] = blockBody => compactPickledBytes(Pickle.intoBytes(blockBody)) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/BlockHeadersStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/BlockHeadersStorage.scala index e900ca9df6..230489499e 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/BlockHeadersStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/BlockHeadersStorage.scala @@ -25,6 +25,8 @@ class BlockHeadersStorage(val dataSource: DataSource) override def keySerializer: BlockHeaderHash => IndexedSeq[Byte] = _.toIndexedSeq + override def keyDeserializer: IndexedSeq[Byte] => BlockHeaderHash = k => ByteString.fromArrayUnsafe(k.toArray) + override def valueSerializer: BlockHeader => IndexedSeq[Byte] = blockHeader => compactPickledBytes(Pickle.intoBytes(blockHeader)) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/BlockNumberMappingStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/BlockNumberMappingStorage.scala index c32f8113cb..4437bf61d6 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/BlockNumberMappingStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/BlockNumberMappingStorage.scala @@ -1,5 +1,7 @@ package io.iohk.ethereum.db.storage +import java.math.BigInteger + import akka.util.ByteString import io.iohk.ethereum.db.dataSource.DataSource import io.iohk.ethereum.db.storage.BlockHeadersStorage.BlockHeaderHash @@ -10,6 +12,8 @@ class BlockNumberMappingStorage(val dataSource: DataSource) override def keySerializer: (BigInt) => IndexedSeq[Byte] = index => index.toByteArray + override def keyDeserializer: IndexedSeq[Byte] => BigInt = bytes => new BigInt(new BigInteger(bytes.toArray)) + override def valueSerializer: (BlockHeaderHash) => IndexedSeq[Byte] = identity override def valueDeserializer: (IndexedSeq[Byte]) => BlockHeaderHash = arr => ByteString(arr.toArray[Byte]) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/EvmCodeStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/EvmCodeStorage.scala index 538e7ebc1e..63a030aaf7 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/EvmCodeStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/EvmCodeStorage.scala @@ -2,7 +2,9 @@ package io.iohk.ethereum.db.storage import akka.util.ByteString import io.iohk.ethereum.db.dataSource.DataSource +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError import io.iohk.ethereum.db.storage.EvmCodeStorage._ +import monix.reactive.Observable /** * This class is used to store the EVM Code, by using: @@ -12,8 +14,16 @@ import io.iohk.ethereum.db.storage.EvmCodeStorage._ class EvmCodeStorage(val dataSource: DataSource) extends TransactionalKeyValueStorage[CodeHash, Code] { val namespace: IndexedSeq[Byte] = Namespaces.CodeNamespace def keySerializer: CodeHash => IndexedSeq[Byte] = identity + def keyDeserializer: IndexedSeq[Byte] => CodeHash = k => ByteString.fromArrayUnsafe(k.toArray) def valueSerializer: Code => IndexedSeq[Byte] = identity def valueDeserializer: IndexedSeq[Byte] => Code = (code: IndexedSeq[Byte]) => ByteString(code.toArray) + + // overriding to avoid going through IndexedSeq[Byte] + override def storageContent: Observable[Either[IterationError, (CodeHash, Code)]] = { + dataSource.iterate(namespace).map { result => + result.map { case (key, value) => (ByteString.fromArrayUnsafe(key), ByteString.fromArrayUnsafe(value)) } + } + } } object EvmCodeStorage { diff --git a/src/main/scala/io/iohk/ethereum/db/storage/FastSyncStateStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/FastSyncStateStorage.scala index 4eea243c42..e1523d45b1 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/FastSyncStateStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/FastSyncStateStorage.scala @@ -33,7 +33,10 @@ class FastSyncStateStorage(val dataSource: DataSource) .addConcreteType[EvmCodeHash] .addConcreteType[StorageRootHash] - override def keySerializer: String => IndexedSeq[Byte] = _.getBytes + override def keySerializer: String => IndexedSeq[Byte] = _.getBytes(StorageStringCharset.UTF8Charset) + + override def keyDeserializer: IndexedSeq[Byte] => String = b => + new String(b.toArray, StorageStringCharset.UTF8Charset) override def valueSerializer: SyncState => IndexedSeq[Byte] = ss => compactPickledBytes(Pickle.intoBytes(ss)) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/KeyValueStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/KeyValueStorage.scala index a6bfc3403b..2ea6fb7d84 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/KeyValueStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/KeyValueStorage.scala @@ -1,13 +1,16 @@ package io.iohk.ethereum.db.storage import io.iohk.ethereum.common.SimpleMap +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError import io.iohk.ethereum.db.dataSource.{DataSource, DataSourceUpdate} +import monix.reactive.Observable trait KeyValueStorage[K, V, T <: KeyValueStorage[K, V, T]] extends SimpleMap[K, V, T] { val dataSource: DataSource val namespace: IndexedSeq[Byte] def keySerializer: K => IndexedSeq[Byte] + def keyDeserializer: IndexedSeq[Byte] => K def valueSerializer: V => IndexedSeq[Byte] def valueDeserializer: IndexedSeq[Byte] => V @@ -42,4 +45,10 @@ trait KeyValueStorage[K, V, T <: KeyValueStorage[K, V, T]] extends SimpleMap[K, ) apply(dataSource) } + + def storageContent: Observable[Either[IterationError, (K, V)]] = { + dataSource.iterate(namespace).map { result => + result.map { case (key, value) => (keyDeserializer(key.toIndexedSeq), valueDeserializer(value)) } + } + } } diff --git a/src/main/scala/io/iohk/ethereum/db/storage/KnownNodesStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/KnownNodesStorage.scala index ad8964e7a1..903a623a67 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/KnownNodesStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/KnownNodesStorage.scala @@ -12,10 +12,11 @@ class KnownNodesStorage(val dataSource: DataSource) extends TransactionalKeyValu val key = "KnownNodes" val namespace: IndexedSeq[Byte] = Namespaces.KnownNodesNamespace - def keySerializer: String => IndexedSeq[Byte] = _.getBytes - def valueSerializer: Set[String] => IndexedSeq[Byte] = _.mkString(" ").getBytes + def keySerializer: String => IndexedSeq[Byte] = _.getBytes(StorageStringCharset.UTF8Charset) + def keyDeserializer: IndexedSeq[Byte] => String = k => new String(k.toArray, StorageStringCharset.UTF8Charset) + def valueSerializer: Set[String] => IndexedSeq[Byte] = _.mkString(" ").getBytes(StorageStringCharset.UTF8Charset) def valueDeserializer: IndexedSeq[Byte] => Set[String] = (valueBytes: IndexedSeq[Byte]) => - new String(valueBytes.toArray).split(' ').toSet + new String(valueBytes.toArray, StorageStringCharset.UTF8Charset).split(' ').toSet def getKnownNodes(): Set[URI] = { get(key).getOrElse(Set.empty).filter(_.nonEmpty).map(new URI(_)) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/NodeStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/NodeStorage.scala index a6fa347abe..aeafdbb80e 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/NodeStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/NodeStorage.scala @@ -2,8 +2,10 @@ package io.iohk.ethereum.db.storage import akka.util.ByteString import io.iohk.ethereum.db.cache.Cache +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError import io.iohk.ethereum.db.dataSource.{DataSource, DataSourceUpdateOptimized} import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash} +import monix.reactive.Observable sealed trait NodesStorage extends { def get(key: NodeHash): Option[NodeEncoded] @@ -21,16 +23,12 @@ class NodeStorage(val dataSource: DataSource) with NodesStorage { val namespace: IndexedSeq[Byte] = Namespaces.NodeNamespace - private val specialNameSpace = namespace.head def keySerializer: NodeHash => IndexedSeq[Byte] = _.toIndexedSeq + def keyDeserializer: IndexedSeq[Byte] => NodeHash = h => ByteString(h.toArray) def valueSerializer: NodeEncoded => IndexedSeq[Byte] = _.toIndexedSeq def valueDeserializer: IndexedSeq[Byte] => NodeEncoded = _.toArray - def specialSerializer(nodeHash: NodeHash): Array[Byte] = { - (specialNameSpace +: nodeHash).toArray - } - - override def get(key: NodeHash): Option[NodeEncoded] = dataSource.getOptimized(specialSerializer(key)) + override def get(key: NodeHash): Option[NodeEncoded] = dataSource.getOptimized(namespace, key.toArray) /** * This function updates the KeyValueStorage by deleting, updating and inserting new (key-value) pairs @@ -45,14 +43,21 @@ class NodeStorage(val dataSource: DataSource) dataSource.update( Seq( DataSourceUpdateOptimized( - toRemove = toRemove.map(specialSerializer), - toUpsert = toUpsert.map(values => specialSerializer(values._1) -> values._2) + namespace = Namespaces.NodeNamespace, + toRemove = toRemove.map(_.toArray), + toUpsert = toUpsert.map(values => values._1.toArray -> values._2) ) ) ) apply(dataSource) } + override def storageContent: Observable[Either[IterationError, (NodeHash, NodeEncoded)]] = { + dataSource.iterate(namespace).map { result => + result.map { case (key, value) => (ByteString.fromArrayUnsafe(key), value) } + } + } + protected def apply(dataSource: DataSource): NodeStorage = new NodeStorage(dataSource) def updateCond(toRemove: Seq[NodeHash], toUpsert: Seq[(NodeHash, NodeEncoded)], inMemory: Boolean): NodesStorage = { diff --git a/src/main/scala/io/iohk/ethereum/db/storage/ReceiptStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/ReceiptStorage.scala index 0c39e37e22..e1d75fac57 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/ReceiptStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/ReceiptStorage.scala @@ -23,6 +23,8 @@ class ReceiptStorage(val dataSource: DataSource) extends TransactionalKeyValueSt override def keySerializer: BlockHash => IndexedSeq[Byte] = _.toIndexedSeq + override def keyDeserializer: IndexedSeq[Byte] => BlockHash = k => ByteString.fromArrayUnsafe(k.toArray) + override def valueSerializer: ReceiptSeq => IndexedSeq[Byte] = receipts => compactPickledBytes(Pickle.intoBytes(receipts)) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/StorageStringCharset.scala b/src/main/scala/io/iohk/ethereum/db/storage/StorageStringCharset.scala new file mode 100644 index 0000000000..342277eb21 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/db/storage/StorageStringCharset.scala @@ -0,0 +1,7 @@ +package io.iohk.ethereum.db.storage + +import java.nio.charset.Charset + +object StorageStringCharset { + val UTF8Charset = Charset.forName("UTF-8") +} diff --git a/src/main/scala/io/iohk/ethereum/db/storage/TotalDifficultyStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/TotalDifficultyStorage.scala index 8cdd85aaf2..fb11c7632e 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/TotalDifficultyStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/TotalDifficultyStorage.scala @@ -13,6 +13,7 @@ class TotalDifficultyStorage(val dataSource: DataSource) extends TransactionalKeyValueStorage[BlockHash, TotalDifficulty] { val namespace: IndexedSeq[Byte] = Namespaces.TotalDifficultyNamespace def keySerializer: BlockHash => IndexedSeq[Byte] = _.toIndexedSeq + def keyDeserializer: IndexedSeq[Byte] => BlockHash = k => ByteString.fromArrayUnsafe(k.toArray) def valueSerializer: TotalDifficulty => IndexedSeq[Byte] = _.toByteArray.toIndexedSeq def valueDeserializer: IndexedSeq[Byte] => BigInt = (valueBytes: IndexedSeq[Byte]) => BigInt(1, valueBytes.toArray) } diff --git a/src/main/scala/io/iohk/ethereum/db/storage/TransactionMappingStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/TransactionMappingStorage.scala index 7f7c0cc76c..fe9ca7f7cd 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/TransactionMappingStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/TransactionMappingStorage.scala @@ -13,6 +13,7 @@ class TransactionMappingStorage(val dataSource: DataSource) val namespace: IndexedSeq[Byte] = Namespaces.TransactionMappingNamespace def keySerializer: TxHash => IndexedSeq[Byte] = identity + def keyDeserializer: IndexedSeq[Byte] => TxHash = identity def valueSerializer: TransactionLocation => IndexedSeq[Byte] = tl => compactPickledBytes(Pickle.intoBytes(tl)) def valueDeserializer: IndexedSeq[Byte] => TransactionLocation = bytes => Unpickle[TransactionLocation].fromBytes(ByteBuffer.wrap(bytes.toArray[Byte])) diff --git a/src/main/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorage.scala b/src/main/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorage.scala index 63ffdec72e..2c9fb82ad0 100644 --- a/src/main/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorage.scala +++ b/src/main/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorage.scala @@ -1,6 +1,8 @@ package io.iohk.ethereum.db.storage +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError import io.iohk.ethereum.db.dataSource.{DataSource, DataSourceBatchUpdate, DataSourceUpdate} +import monix.reactive.Observable /** * Represents transactional key value storage mapping keys of type K to values of type V @@ -14,6 +16,7 @@ trait TransactionalKeyValueStorage[K, V] { def keySerializer: K => IndexedSeq[Byte] def valueSerializer: V => IndexedSeq[Byte] def valueDeserializer: IndexedSeq[Byte] => V + def keyDeserializer: IndexedSeq[Byte] => K /** * This function obtains the associated value to a key in the current namespace, if there exists one. @@ -50,4 +53,10 @@ trait TransactionalKeyValueStorage[K, V] { def emptyBatchUpdate: DataSourceBatchUpdate = DataSourceBatchUpdate(dataSource, Array.empty) + + def storageContent: Observable[Either[IterationError, (K, V)]] = { + dataSource.iterate(namespace).map { result => + result.map { case (key, value) => (keyDeserializer(key.toIndexedSeq), valueDeserializer(value)) } + } + } } diff --git a/src/main/scala/io/iohk/ethereum/domain/Blockchain.scala b/src/main/scala/io/iohk/ethereum/domain/Blockchain.scala index b988c60c09..0d2b33958b 100644 --- a/src/main/scala/io/iohk/ethereum/domain/Blockchain.scala +++ b/src/main/scala/io/iohk/ethereum/domain/Blockchain.scala @@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.util.ByteString import io.iohk.ethereum.db.dataSource.DataSourceBatchUpdate +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError import io.iohk.ethereum.db.storage.NodeStorage.{NodeEncoded, NodeHash} import io.iohk.ethereum.db.storage.StateStorage.RollBackFlush import io.iohk.ethereum.db.storage.TransactionMappingStorage.TransactionLocation @@ -14,6 +15,7 @@ import io.iohk.ethereum.domain.BlockchainImpl.BestBlockLatestCheckpointNumbers import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxyStorage} import io.iohk.ethereum.mpt.{MerklePatriciaTrie, MptNode} import io.iohk.ethereum.vm.{Storage, WorldStateProxy} +import monix.reactive.Observable import scala.annotation.tailrec @@ -195,6 +197,8 @@ trait Blockchain { ): WS def getStateStorage: StateStorage + + def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] } // scalastyle:on @@ -417,6 +421,11 @@ class BlockchainImpl( } // scalastyle:on method.length + def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = { + (nodeStorage.storageContent.map(c => c.map(_._1)) ++ evmCodeStorage.storageContent.map(c => c.map(_._1))) + .takeWhileInclusive(_.isRight) + } + /** * Recursive function which try to find the previous checkpoint by traversing blocks from top to the bottom. * In case of finding the checkpoint block number, the function will finish the job and return result diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala index 010d0dc64f..2227c78f33 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/EphemBlockchainTestSetup.scala @@ -7,7 +7,7 @@ import io.iohk.ethereum.nodebuilder.PruningConfigBuilder trait EphemBlockchainTestSetup extends ScenarioSetup { - sealed trait LocalPruningConfigBuilder extends PruningConfigBuilder { + trait LocalPruningConfigBuilder extends PruningConfigBuilder { override lazy val pruningMode: PruningMode = ArchivePruning } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilterSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilterSpec.scala new file mode 100644 index 0000000000..38bed549b7 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilterSpec.scala @@ -0,0 +1,55 @@ +package io.iohk.ethereum.blockchain.sync + +import com.google.common.hash.{Funnel, Funnels, PrimitiveSink} +import io.iohk.ethereum.FlatSpecBase +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError +import monix.eval.Task +import monix.reactive.Observable + +class LoadableBloomFilterSpec extends FlatSpecBase { + implicit object LongFun extends Funnel[Long] { + override def funnel(from: Long, into: PrimitiveSink): Unit = { + Funnels.longFunnel().funnel(from, into) + } + } + + "LoadableBloomFilter" should "load all correct elements " in testCaseM { + for { + source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L)))) + filter = LoadableBloomFilter[Long](1000, source) + result <- filter.loadFromSource + } yield { + assert(result.writtenElements == 3) + assert(result.error.isEmpty) + assert(filter.approximateElementCount == 3) + } + } + + it should "load filter only once" in testCaseM[Task] { + for { + source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L)))) + filter = LoadableBloomFilter[Long](1000, source) + result <- filter.loadFromSource + result1 <- filter.loadFromSource + } yield { + assert(result.writtenElements == 3) + assert(result.error.isEmpty) + assert(filter.approximateElementCount == 3) + assert(result1 == result) + } + } + + it should "report last error if encountered" in testCaseM[Task] { + for { + error <- Task(IterationError(new RuntimeException("test"))) + source = Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L), Left(error))) + filter = LoadableBloomFilter[Long](1000, source) + result <- filter.loadFromSource + } yield { + assert(result.writtenElements == 3) + assert(result.error.contains(error)) + assert(filter.approximateElementCount == 3) + } + } + +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala index 3acffeab86..da80684a0a 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/StateSyncSpec.scala @@ -1,27 +1,33 @@ package io.iohk.ethereum.blockchain.sync import java.net.InetSocketAddress +import java.util.concurrent.ThreadLocalRandom import akka.actor.{ActorRef, ActorSystem} import akka.testkit.TestActor.AutoPilot import akka.testkit.{TestKit, TestProbe} -import io.iohk.ethereum.blockchain.sync.StateSyncUtils.TrieProvider +import akka.util.ByteString +import io.iohk.ethereum.blockchain.sync.StateSyncUtils.{MptNodeData, TrieProvider} import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{ RestartRequested, StartSyncingTo, StateSyncFinished, WaitingForNewTargetBlock } -import io.iohk.ethereum.domain.BlockchainImpl +import io.iohk.ethereum.db.dataSource.RocksDbDataSource.IterationError +import io.iohk.ethereum.domain.{Address, BlockchainImpl} import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo, SendMessage} -import io.iohk.ethereum.network.{Peer, PeerId} import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer import io.iohk.ethereum.network.p2p.messages.CommonMessages.Status import io.iohk.ethereum.network.p2p.messages.PV63.GetNodeData.GetNodeDataEnc import io.iohk.ethereum.network.p2p.messages.PV63.NodeData import io.iohk.ethereum.network.p2p.messages.Versions +import io.iohk.ethereum.network.{Peer, PeerId} import io.iohk.ethereum.utils.Config import io.iohk.ethereum.{Fixtures, ObjectGenerators, WithActorSystemShutDown} +import monix.eval.Task +import monix.execution.Scheduler +import monix.reactive.Observable import org.scalactic.anyvals.PosInt import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike @@ -89,6 +95,47 @@ class StateSyncSpec } } + it should "start state sync when receiving start signal while bloom filter is loading" in new TestSetup() { + @volatile + var loadingFinished = false + val externalScheduler = Scheduler(system.dispatcher) + + override def buildBlockChain(): BlockchainImpl = { + val storages = getNewStorages.storages + + new BlockchainImpl( + blockHeadersStorage = storages.blockHeadersStorage, + blockBodiesStorage = storages.blockBodiesStorage, + blockNumberMappingStorage = storages.blockNumberMappingStorage, + receiptStorage = storages.receiptStorage, + evmCodeStorage = storages.evmCodeStorage, + pruningMode = storages.pruningMode, + nodeStorage = storages.nodeStorage, + cachedNodeStorage = storages.cachedNodeStorage, + totalDifficultyStorage = storages.totalDifficultyStorage, + transactionMappingStorage = storages.transactionMappingStorage, + appStateStorage = storages.appStateStorage, + stateStorage = storages.stateStorage + ) { + override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = { + Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished) + } + } + + } + val nodeData = (0 until 1000).map(i => MptNodeData(Address(i), None, Seq(), i)) + val initiator = TestProbe() + val trieProvider1 = TrieProvider() + val target = trieProvider1.buildWorld(nodeData) + setAutoPilotWithProvider(trieProvider1) + initiator.send(scheduler, StartSyncingTo(target, 1)) + externalScheduler.scheduleOnce(3.second) { + loadingFinished = true + } + + initiator.expectMsg(20.seconds, StateSyncFinished) + } + class TestSetup extends EphemBlockchainTestSetup with TestSyncConfig { override implicit lazy val system = actorSystem type PeerConfig = Map[PeerId, PeerAction] @@ -131,7 +178,7 @@ class StateSyncSpec } val maxMptNodeRequest = 50 - + val minMptNodeRequest = 20 val partialResponseConfig: PeerConfig = peersMap.map { case (peer, _) => peer.id -> PartialResponse } @@ -162,7 +209,8 @@ class StateSyncSpec sender ! MessageFromPeer(responseMsg, peer) this case PartialResponse => - val elementsToServe = Random.nextInt(maxMptNodeRequest) + val random: ThreadLocalRandom = ThreadLocalRandom.current() + val elementsToServe = random.nextInt(minMptNodeRequest, maxMptNodeRequest + 1) val toGet = msg.underlyingMsg.mptElementsHashes.toList.take(elementsToServe) val responseMsg = NodeData(trieProvider.getNodes(toGet).map(_.data)) sender ! MessageFromPeer(responseMsg, peer) @@ -194,12 +242,22 @@ class StateSyncSpec BlockchainImpl(getNewStorages.storages) } + def genRandomArray(): Array[Byte] = { + val arr = new Array[Byte](32) + Random.nextBytes(arr) + arr + } + + def genRandomByteString(): ByteString = { + ByteString.fromArrayUnsafe(genRandomArray()) + } + lazy val scheduler = system.actorOf( SyncStateSchedulerActor.props( downloader, - new SyncStateScheduler( + SyncStateScheduler( buildBlockChain(), - SyncStateScheduler.getEmptyFilter(syncConfig.stateSyncBloomFilterSize) + syncConfig.stateSyncBloomFilterSize ), syncConfig ) diff --git a/src/test/scala/io/iohk/ethereum/db/storage/KeyValueStorageSuite.scala b/src/test/scala/io/iohk/ethereum/db/storage/KeyValueStorageSuite.scala index 5580c8483f..80686013d5 100644 --- a/src/test/scala/io/iohk/ethereum/db/storage/KeyValueStorageSuite.scala +++ b/src/test/scala/io/iohk/ethereum/db/storage/KeyValueStorageSuite.scala @@ -25,6 +25,7 @@ class KeyValueStorageSuite extends AnyFunSuite with ScalaCheckPropertyChecks wit override val namespace: IndexedSeq[Byte] = intNamespace override def keySerializer: Int => IndexedSeq[Byte] = intSerializer + override def keyDeserializer: IndexedSeq[Byte] => Int = intDeserializer override def valueSerializer: Int => IndexedSeq[Byte] = intSerializer override def valueDeserializer: IndexedSeq[Byte] => Int = intDeserializer diff --git a/src/test/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorageSuite.scala b/src/test/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorageSuite.scala index 6805c5cb17..f9cd6a4f1c 100644 --- a/src/test/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorageSuite.scala +++ b/src/test/scala/io/iohk/ethereum/db/storage/TransactionalKeyValueStorageSuite.scala @@ -23,6 +23,7 @@ class TransactionalKeyValueStorageSuite extends AnyFunSuite with ScalaCheckPrope override val namespace: IndexedSeq[Byte] = intNamespace override def keySerializer: Int => IndexedSeq[Byte] = intSerializer + override def keyDeserializer: IndexedSeq[Byte] => Int = intDeserializer override def valueSerializer: Int => IndexedSeq[Byte] = intSerializer override def valueDeserializer: IndexedSeq[Byte] => Int = intDeserializer }