Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.iohk.ethereum.nodebuilder

import akka.actor.typed.ActorSystem

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Failure
Expand All @@ -13,6 +15,8 @@ import io.iohk.ethereum.metrics.MetricsConfig
import io.iohk.ethereum.network.PeerManagerActor
import io.iohk.ethereum.network.ServerActor
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager
import io.iohk.ethereum.nodebuilder.tooling.PeriodicConsistencyCheck
import io.iohk.ethereum.nodebuilder.tooling.StorageConsistencyChecker
import io.iohk.ethereum.utils.Config

/** A standard node is everything Ethereum prescribes except the mining algorithm,
Expand All @@ -24,28 +28,32 @@ import io.iohk.ethereum.utils.Config
* @see [[io.iohk.ethereum.nodebuilder.Node Node]]
*/
abstract class BaseNode extends Node {
private[this] def loadGenesisData(): Unit =
if (!Config.testmode) genesisDataLoader.loadGenesisData()

private[this] def startPeerManager(): Unit = peerManager ! PeerManagerActor.StartConnecting
def start(): Unit = {
startMetricsClient()

private[this] def startServer(): Unit = server ! ServerActor.StartServer(networkConfig.Server.listenAddress)
loadGenesisData()

private[this] def startSyncController(): Unit = syncController ! SyncProtocol.Start
runDBConsistencyCheck()

private[this] def startMining(): Unit = mining.startProtocol(this)
startPeerManager()

private[this] def startDiscoveryManager(): Unit = peerDiscoveryManager ! PeerDiscoveryManager.Start
startPortForwarding()

private[this] def startJsonRpcHttpServer(): Unit =
maybeJsonRpcHttpServer match {
case Right(jsonRpcServer) if jsonRpcConfig.httpServerConfig.enabled => jsonRpcServer.run()
case Left(error) if jsonRpcConfig.httpServerConfig.enabled => log.error(error)
case _ => //Nothing
}
startServer()

private[this] def startJsonRpcIpcServer(): Unit =
if (jsonRpcConfig.ipcServerConfig.enabled) jsonRpcIpcServer.run()
startSyncController()

startMining()

startDiscoveryManager()

startJsonRpcHttpServer()

startJsonRpcIpcServer()

startPeriodicDBConsistencyCheck()
}

private[this] def startMetricsClient(): Unit = {
val metricsConfig = MetricsConfig(Config.config)
Expand All @@ -56,25 +64,47 @@ abstract class BaseNode extends Node {
}
}

def start(): Unit = {
startMetricsClient()
private[this] def loadGenesisData(): Unit =
if (!Config.testmode) genesisDataLoader.loadGenesisData()

loadGenesisData()
private[this] def runDBConsistencyCheck(): Unit =
StorageConsistencyChecker.checkStorageConsistency(
storagesInstance.storages.appStateStorage.getBestBlockNumber(),
storagesInstance.storages.blockNumberMappingStorage,
storagesInstance.storages.blockHeadersStorage,
shutdown
)(log)

startPeerManager()
private[this] def startPeerManager(): Unit = peerManager ! PeerManagerActor.StartConnecting

startPortForwarding()
startServer()
private[this] def startServer(): Unit = server ! ServerActor.StartServer(networkConfig.Server.listenAddress)

startSyncController()
private[this] def startSyncController(): Unit = syncController ! SyncProtocol.Start

startMining()
private[this] def startMining(): Unit = mining.startProtocol(this)

startDiscoveryManager()
private[this] def startDiscoveryManager(): Unit = peerDiscoveryManager ! PeerDiscoveryManager.Start

startJsonRpcHttpServer()
startJsonRpcIpcServer()
}
private[this] def startJsonRpcHttpServer(): Unit =
maybeJsonRpcHttpServer match {
case Right(jsonRpcServer) if jsonRpcConfig.httpServerConfig.enabled => jsonRpcServer.run()
case Left(error) if jsonRpcConfig.httpServerConfig.enabled => log.error(error)
case _ => //Nothing
}

private[this] def startJsonRpcIpcServer(): Unit =
if (jsonRpcConfig.ipcServerConfig.enabled) jsonRpcIpcServer.run()

def startPeriodicDBConsistencyCheck(): Unit =
ActorSystem(
PeriodicConsistencyCheck.start(
storagesInstance.storages.appStateStorage,
storagesInstance.storages.blockNumberMappingStorage,
storagesInstance.storages.blockHeadersStorage,
shutdown
),
"PeriodicDBConsistencyCheck"
)

override def shutdown(): Unit = {
def tryAndLogFailure(f: () => Any): Unit = Try(f()) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.iohk.ethereum.nodebuilder.tooling

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler

import scala.concurrent.duration.DurationInt

import io.iohk.ethereum.db.storage.AppStateStorage
import io.iohk.ethereum.db.storage.BlockHeadersStorage
import io.iohk.ethereum.db.storage.BlockNumberMappingStorage
import io.iohk.ethereum.nodebuilder.tooling.PeriodicConsistencyCheck.ConsistencyCheck
import io.iohk.ethereum.utils.Logger

object PeriodicConsistencyCheck {
def start(
appStateStorage: AppStateStorage,
blockNumberMappingStorage: BlockNumberMappingStorage,
blockHeadersStorage: BlockHeadersStorage,
shutdown: () => Unit
): Behavior[ConsistencyCheck] =
Behaviors.withTimers { timers =>
tick(timers)
PeriodicConsistencyCheck(timers, appStateStorage, blockNumberMappingStorage, blockHeadersStorage, shutdown)
.check()
}

sealed trait ConsistencyCheck extends Product with Serializable
case object Tick extends ConsistencyCheck

def tick(timers: TimerScheduler[ConsistencyCheck]): Unit =
timers.startSingleTimer(Tick, 10.minutes)
}

case class PeriodicConsistencyCheck(
timers: TimerScheduler[ConsistencyCheck],
appStateStorage: AppStateStorage,
blockNumberMappingStorage: BlockNumberMappingStorage,
blockHeadersStorage: BlockHeadersStorage,
shutdown: () => Unit
) extends Logger {
import PeriodicConsistencyCheck._

def check(): Behavior[ConsistencyCheck] = Behaviors.receiveMessage { case Tick =>
log.debug("Running a storageConsistency check")
StorageConsistencyChecker.checkStorageConsistency(
appStateStorage.getBestBlockNumber(),
blockNumberMappingStorage,
blockHeadersStorage,
shutdown
)(log)
tick(timers)
Behaviors.same
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.iohk.ethereum.nodebuilder.tooling

import com.typesafe.scalalogging.Logger

import io.iohk.ethereum.db.storage.BlockHeadersStorage
import io.iohk.ethereum.db.storage.BlockNumberMappingStorage

object StorageConsistencyChecker {
type ShutdownOp = () => Unit

val DefaultStep = 1000

def checkStorageConsistency(
bestBlockNumber: BigInt,
blockNumberMappingStorage: BlockNumberMappingStorage,
blockHeadersStorage: BlockHeadersStorage,
shutdown: ShutdownOp,
step: Int = DefaultStep
)(implicit log: Logger): Unit =
Range(0, bestBlockNumber.intValue, step).foreach { idx =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it makes sense to check every 1000th block. I imagine a case where the check passes but the db is inconsistent.
Perhaps it would be worthwhile to occasionally check the whole range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a first pass. I want to start identifying the circumstances when clients loose consistency.

(for {
hash <- blockNumberMappingStorage.get(idx)
_ <- blockHeadersStorage.get(hash)
} yield ()).fold {
log.error("Database seems to be in inconsistent state, shutting down")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we include information on how the user should act when this happens? I guess how to delete the DB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a good place to add such documentation for the client? Not convinced, I think we have a mantis docs project.
We don't have a policy for such situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case advising the user to check the website. But I think that saying nothing is the worse option :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should log the missing block's hash and number. I'm not sure it would really helps us when this happens, but it does not hurt.

shutdown()
}(_ => ())
}
}