Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/resources/conf/base.conf
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ mantis {
peer-manager-timeout = 5.seconds
}

health {
# If the best known block number stays the same for more time than this,
# the healthcheck will consider the client to be stuck and return an error
no-update-duration-threshold = 30.minutes
# If the difference between the best stored block number and the best known block number
# is less than this value, the healthcheck will report that the client is synced.
syncing-status-threshold = 10
}

miner-active-timeout = 5.seconds
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ class FaucetJsonRpcHealthCheck(faucetRpcService: FaucetRpcService) extends JsonR

protected def mainService: String = "faucet health"

final val statusHC = JsonRpcHealthcheck("status", faucetRpcService.status(StatusRequest()))
final val statusHC = JsonRpcHealthcheck.fromServiceResponse("status", faucetRpcService.status(StatusRequest()))

override def healthCheck(): Task[HealthcheckResponse] = {
val statusF = statusHC()
val statusF = statusHC.map(_.toResult)
val responseF = statusF.map(check => HealthcheckResponse(List(check)))

handleResponse(responseF)
Expand Down
34 changes: 0 additions & 34 deletions src/main/scala/io/iohk/ethereum/healthcheck/Healthcheck.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package io.iohk.ethereum.healthcheck

final case class HealthcheckResult private (description: String, status: String, error: Option[String]) {
assert(
status == HealthcheckStatus.OK && error.isEmpty || status == HealthcheckStatus.ERROR && error.isDefined
)
final case class HealthcheckResult private (
name: String,
status: String,
info: Option[String]
) {

def isOK: Boolean = status == HealthcheckStatus.OK
}

object HealthcheckResult {
def apply(description: String, error: Option[String]): HealthcheckResult =

def ok(name: String, info: Option[String] = None): HealthcheckResult =
new HealthcheckResult(
name = name,
status = HealthcheckStatus.OK,
info = info
)

def error(name: String, error: String): HealthcheckResult =
new HealthcheckResult(
description = description,
status = error.fold(HealthcheckStatus.OK)(_ => HealthcheckStatus.ERROR),
error = error
name = name,
status = HealthcheckStatus.ERROR,
info = Some(error)
)
}
48 changes: 45 additions & 3 deletions src/main/scala/io/iohk/ethereum/jsonrpc/JsonRpcHealthcheck.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,51 @@
package io.iohk.ethereum.jsonrpc

import io.iohk.ethereum.healthcheck.Healthcheck
import io.iohk.ethereum.healthcheck.HealthcheckResult
import monix.eval.Task

final case class JsonRpcHealthcheck[Response](
name: String,
healthCheck: Either[String, Response],
info: Option[String] = None
) {

def toResult: HealthcheckResult = {
healthCheck
.fold(
HealthcheckResult.error(name, _),
result => HealthcheckResult.ok(name, info)
)
}

def withPredicate(message: String)(predicate: Response => Boolean): JsonRpcHealthcheck[Response] =
copy(healthCheck = healthCheck.filterOrElse(predicate, message))

def collect[T](message: String)(collectFn: PartialFunction[Response, T]): JsonRpcHealthcheck[T] =
copy(
name = name,
healthCheck = healthCheck.flatMap(collectFn.lift(_).toRight(message))
)

def withInfo(getInfo: Response => String): JsonRpcHealthcheck[Response] =
copy(info = healthCheck.toOption.map(getInfo))
}

object JsonRpcHealthcheck {
type T[R] = Healthcheck[JsonRpcError, R]

def apply[R](description: String, f: ServiceResponse[R]): T[R] = Healthcheck(description, f)
def fromServiceResponse[Response](name: String, f: ServiceResponse[Response]): Task[JsonRpcHealthcheck[Response]] =
f.map(result =>
JsonRpcHealthcheck(
name,
result.left.map[String](_.message)
)
).onErrorHandle(t => JsonRpcHealthcheck(name, Left(t.getMessage())))

def fromTask[Response](name: String, f: Task[Response]): Task[JsonRpcHealthcheck[Response]] =
f.map(result =>
JsonRpcHealthcheck(
name,
Right(result)
)
).onErrorHandle(t => JsonRpcHealthcheck(name, Left(t.getMessage())))

}
155 changes: 131 additions & 24 deletions src/main/scala/io/iohk/ethereum/jsonrpc/NodeJsonRpcHealthChecker.scala
Original file line number Diff line number Diff line change
@@ -1,43 +1,150 @@
package io.iohk.ethereum.jsonrpc

import io.iohk.ethereum.healthcheck.HealthcheckResponse
import io.iohk.ethereum.jsonrpc.EthBlocksService.BlockByNumberRequest
import io.iohk.ethereum.jsonrpc.EthBlocksService.{
BlockByNumberRequest,
BlockByNumberResponse,
BestBlockNumberRequest,
BestBlockNumberResponse
}
import io.iohk.ethereum.jsonrpc.EthInfoService._
import io.iohk.ethereum.jsonrpc.NodeJsonRpcHealthChecker.JsonRpcHealthConfig
import io.iohk.ethereum.jsonrpc.NetService._
import io.iohk.ethereum.jsonrpc.AkkaTaskOps._
import com.typesafe.config.{Config => TypesafeConfig}
import monix.eval.Task
import java.time.Instant
import java.time.Duration
import akka.actor.ActorRef
import io.iohk.ethereum.blockchain.sync.SyncProtocol
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status._
import akka.util.Timeout
import io.iohk.ethereum.utils.AsyncConfig

class NodeJsonRpcHealthChecker(
netService: NetService,
ethBlocksService: EthBlocksService
ethBlocksService: EthBlocksService,
syncingController: ActorRef,
config: JsonRpcHealthConfig,
asyncConfig: AsyncConfig
) extends JsonRpcHealthChecker {

implicit val askTimeout: Timeout = asyncConfig.askTimeout

protected def mainService: String = "node health"

final val listeningHC = JsonRpcHealthcheck("listening", netService.listening(NetService.ListeningRequest()))
final val peerCountHC = JsonRpcHealthcheck("peerCount", netService.peerCount(PeerCountRequest()))
final val earliestBlockHC = JsonRpcHealthcheck(
"earliestBlock",
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Earliest, fullTxs = true))
)
final val latestBlockHC = JsonRpcHealthcheck(
"latestBlock",
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Latest, fullTxs = true))
)
final val pendingBlockHC = JsonRpcHealthcheck(
"pendingBlock",
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Pending, fullTxs = true))
)
private var previousBestFetchingBlock: Option[(Instant, BigInt)] = None

private val peerCountHC = JsonRpcHealthcheck
.fromServiceResponse("peerCount", netService.peerCount(PeerCountRequest()))
.map(
_.withInfo(_.value.toString)
.withPredicate("peer count is 0")(_.value > 0)
)

private val storedBlockHC = JsonRpcHealthcheck
.fromServiceResponse(
"bestStoredBlock",
ethBlocksService.getBlockByNumber(BlockByNumberRequest(BlockParam.Latest, fullTxs = true))
)
.map(
_.collect("No block is currently stored") { case EthBlocksService.BlockByNumberResponse(Some(v)) => v }
.withInfo(_.number.toString)
)

private val bestKnownBlockHC = JsonRpcHealthcheck
.fromServiceResponse("bestKnownBlock", getBestKnownBlockTask)
.map(_.withInfo(_.toString))

private val fetchingBlockHC = JsonRpcHealthcheck
.fromServiceResponse("bestFetchingBlock", getBestFetchingBlockTask)
.map(
_.collect("no best fetching block") { case Some(v) => v }
.withInfo(_.toString)
)

private val updateStatusHC = JsonRpcHealthcheck
.fromServiceResponse("updateStatus", getBestFetchingBlockTask)
.map(
_.collect("no best fetching block") { case Some(v) => v }
.withPredicate(s"block did not change for more than ${config.noUpdateDurationThreshold.getSeconds()} s")(
blockNumberHasChanged
)
)

private val syncStatusHC =
JsonRpcHealthcheck
.fromTask("syncStatus", syncingController.askFor[SyncProtocol.Status](SyncProtocol.GetStatus))
.map(_.withInfo {
case NotSyncing => "STARTING"
case s: Syncing if isConsideredSyncing(s.blocksProgress) => "SYNCING"
case _ => "SYNCED"
})

override def healthCheck(): Task[HealthcheckResponse] = {
val listeningF = listeningHC()
val peerCountF = peerCountHC()
val earliestBlockF = earliestBlockHC()
val latestBlockF = latestBlockHC()
val pendingBlockF = pendingBlockHC()
val responseTask = Task
.parSequence(
List(
peerCountHC,
storedBlockHC,
bestKnownBlockHC,
fetchingBlockHC,
updateStatusHC,
syncStatusHC
)
)
.map(_.map(_.toResult))
.map(HealthcheckResponse)

handleResponse(responseTask)
}

private def blockNumberHasChanged(newBestFetchingBlock: BigInt) =
previousBestFetchingBlock match {
case Some((firstSeenAt, value)) if value == newBestFetchingBlock =>
Instant.now().minus(config.noUpdateDurationThreshold).isBefore(firstSeenAt)
case _ =>
previousBestFetchingBlock = Some((Instant.now(), newBestFetchingBlock))
true
}

/** Try to fetch best block number from the sync controller or fallback to ethBlocksService */
private def getBestKnownBlockTask =
syncingController
.askFor[SyncProtocol.Status](SyncProtocol.GetStatus)
.flatMap {
case NotSyncing | SyncDone =>
ethBlocksService
.bestBlockNumber(EthBlocksService.BestBlockNumberRequest())
.map(_.map(_.bestBlockNumber))
case Syncing(_, progress, _) => Task.now(Right(progress.target))
}

/** Try to fetch best fetching number from the sync controller or fallback to ethBlocksService */
private def getBestFetchingBlockTask =
syncingController
.askFor[SyncProtocol.Status](SyncProtocol.GetStatus)
.flatMap {
case NotSyncing | SyncDone =>
ethBlocksService
.getBlockByNumber(BlockByNumberRequest(BlockParam.Pending, fullTxs = true))
.map(_.map(_.blockResponse.map(_.number)))
case Syncing(_, progress, _) => Task.now(Right(Some(progress.current)))
}

private def isConsideredSyncing(progress: Progress) =
progress.target - progress.current > config.syncingStatusThreshold

}

val allChecksF = List(listeningF, peerCountF, earliestBlockF, latestBlockF, pendingBlockF)
val responseF = Task.sequence(allChecksF).map(HealthcheckResponse)
object NodeJsonRpcHealthChecker {
case class JsonRpcHealthConfig(noUpdateDurationThreshold: Duration, syncingStatusThreshold: Int)

handleResponse(responseF)
object JsonRpcHealthConfig {
def apply(rpcConfig: TypesafeConfig): JsonRpcHealthConfig =
JsonRpcHealthConfig(
noUpdateDurationThreshold = rpcConfig.getDuration("health.no-update-duration-threshold"),
syncingStatusThreshold = rpcConfig.getInt("health.syncing-status-threshold")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.iohk.ethereum.jsonrpc.serialization.{JsonEncoder, JsonMethodDecoder}
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
import io.iohk.ethereum.jsonrpc.server.ipc.JsonRpcIpcServer.JsonRpcIpcServerConfig
import io.iohk.ethereum.jsonrpc.{JsonRpcControllerMetrics, JsonRpcError, JsonRpcRequest, JsonRpcResponse}
import io.iohk.ethereum.jsonrpc.NodeJsonRpcHealthChecker.JsonRpcHealthConfig
import io.iohk.ethereum.utils.Logger
import monix.eval.Task
import org.json4s.JsonDSL._
Expand Down Expand Up @@ -120,6 +121,7 @@ object JsonRpcBaseController {
def minerActiveTimeout: FiniteDuration
def httpServerConfig: JsonRpcHttpServerConfig
def ipcServerConfig: JsonRpcIpcServerConfig
def healthConfig: JsonRpcHealthConfig
}

object JsonRpcConfig {
Expand All @@ -143,6 +145,7 @@ object JsonRpcBaseController {

override val httpServerConfig: JsonRpcHttpServerConfig = JsonRpcHttpServerConfig(mantisConfig)
override val ipcServerConfig: JsonRpcIpcServerConfig = JsonRpcIpcServerConfig(mantisConfig)
override val healthConfig: JsonRpcHealthConfig = JsonRpcHealthConfig(rpcConfig)
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,19 @@ trait JSONRpcControllerBuilder {
}

trait JSONRpcHealthcheckerBuilder {
this: NetServiceBuilder with EthBlocksServiceBuilder =>
lazy val jsonRpcHealthChecker: JsonRpcHealthChecker = new NodeJsonRpcHealthChecker(netService, ethBlocksService)
this: NetServiceBuilder
with EthBlocksServiceBuilder
with JSONRpcConfigBuilder
with AsyncConfigBuilder
with SyncControllerBuilder =>
lazy val jsonRpcHealthChecker: JsonRpcHealthChecker =
new NodeJsonRpcHealthChecker(
netService,
ethBlocksService,
syncController,
jsonRpcConfig.healthConfig,
asyncConfig
)
}

trait JSONRpcHttpServerBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class JsonRpcControllerSpec
override def minerActiveTimeout: FiniteDuration = ???
override def httpServerConfig: JsonRpcHttpServer.JsonRpcHttpServerConfig = ???
override def ipcServerConfig: JsonRpcIpcServer.JsonRpcIpcServerConfig = ???
override def healthConfig: NodeJsonRpcHealthChecker.JsonRpcHealthConfig = ???
}

val ethRpcRequest = newJsonRpcRequest("eth_protocolVersion")
Expand Down
Loading