Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fe0b895
[ETCM-269] port Future to monix Task
Oct 21, 2020
6335aac
[ETCM-269] tests port Future to monix Task
Oct 22, 2020
1631143
[ETCM-269] port tests Future to Task in JSON-RPC 2
Oct 22, 2020
9cd0895
[ETCM-269] merge develop
Oct 23, 2020
dfaf173
Merge branch 'develop' into feature/ETCM_269_future_task
Oct 23, 2020
2c63426
[ETCM-269] extract TaskAsk pattern
Oct 23, 2020
2e1e29f
[ETCM-269] scalafmt on compile
Oct 23, 2020
bb1d7b8
[ETCM-269] add 60 min timeout on build
Oct 23, 2020
95ae5b6
[ETCM-269] scalafmt
Oct 23, 2020
effc77b
[ETCM-269] add AkkaAsk in DebugService
Oct 23, 2020
e2960da
[ETCM-269] improve patience
Oct 23, 2020
5bac969
[ETCM-269] buildkite pipeline
Oct 23, 2020
4fb7152
[ETC-269] cleanup actor systems and monix scheduler in tests to avoid…
Oct 23, 2020
9b798df
[ETC-269] cleanup actor systems and monix scheduler in tests to avoid…
Oct 23, 2020
5b8a5c4
[ETCM-126] set explicit timeouts for runSyncUnsafe that is how Future…
Oct 26, 2020
a605089
[ETCM-126] merge develop
Oct 26, 2020
90f7761
[ETCM-126] fix after merge
Oct 26, 2020
00f31d3
[ETCM-126] partial Future and Task implementation to keep original ex…
Oct 26, 2020
442d187
[ETCM-126] partial Future and Task implementation to keep original ex…
Oct 26, 2020
37aa010
[ETCM-269] drop unused imports
Oct 26, 2020
4b1aa9c
[ETCM-269] revert rename
Oct 26, 2020
dab9e18
Merge branch 'develop' into feature/ETCM_269_future_task
lemastero Oct 27, 2020
789186c
[ETCM-269] replace Task.fromFuture + Future to just Task in getFedera…
Oct 27, 2020
a8051fd
[ETCM-269] replace Task.fromFuture + Future with Task in QaService
Oct 27, 2020
e074b15
[ETCM-269] replace Future + Task with Task in DebugService
Oct 27, 2020
4500bce
[ETCM-269] replace Future + Task with Task in Checkpointing
Oct 27, 2020
e6d923f
[ETCM-269] replace Future + Task with Task in PersonalService
Oct 27, 2020
b463d18
Merge branch 'develop' into feature/ETCM_269_future_task
lemastero Oct 28, 2020
74ad2e2
Merge branch 'feature/ETCM_269_future_task' of github.com:input-outpu…
Oct 28, 2020
a20cc7b
[ETCM-269] review remove format on compile
Oct 28, 2020
f4ae082
[ETCM-269] replace () => Task in Healthcheck
Oct 29, 2020
18adc35
[ETCM-269] review remove () => Task in TestService
Oct 29, 2020
8388abd
[ETCM-269] review remove self type in AkkaTaskOps and changed to valu…
Oct 29, 2020
c50fa6c
[ETCM-269] merge from develop
Oct 29, 2020
36c381f
[ETCM-269] resolve issues after merge
Oct 29, 2020
2ccb7f9
[ETCM-269] WIP change PersonalService
Oct 29, 2020
cee6d27
[ETCM-269] change PersonalService
Oct 30, 2020
4604581
[ETCM-269] merge from develop
Oct 30, 2020
490f30a
[ETCM-269] removed unneccessary rename of global monix Scheduler
Oct 30, 2020
86fec48
Merge branch 'develop' into feature/ETCM_269_future_task
lemastero Nov 3, 2020
c823b96
Merge branch 'develop' into feature/ETCM_269_future_task
lemastero Nov 3, 2020
ef3c041
[ETCM-269] fix after merge from master
Nov 3, 2020
7494676
[ETCM-269] review: AkkaAskTask using defereFuture + fix tests
Nov 3, 2020
24cec0f
Merge branch 'develop' into feature/ETCM_269_future_task
Nov 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ steps:
nix eval --json '(import ./.buildkite { pipeline = ./.buildkite/pipeline.nix; })' \
| buildkite-agent pipeline upload --no-interpolation
agents:
queue: project42
queue: project42
timeout_in_minutes: 60
21 changes: 9 additions & 12 deletions src/main/scala/io/iohk/ethereum/healthcheck/Healthcheck.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.iohk.ethereum.healthcheck

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import monix.eval.Task

/**
* Represents a health check, runs it and interprets the outcome.
Expand All @@ -18,20 +17,18 @@ import scala.util.{Failure, Success}
*/
case class Healthcheck[Error, Result](
description: String,
f: () => Future[Either[Error, Result]],
f: Task[Either[Error, Result]],
mapResultToError: Result => Option[String] = (_: Result) => None,
mapErrorToError: Error => Option[String] = (error: Error) => Some(String.valueOf(error)),
mapExceptionToError: Throwable => Option[String] = (t: Throwable) => Some(String.valueOf(t))
) {

def apply()(implicit ec: ExecutionContext): Future[HealthcheckResult] = {
f().transform {
case Success(Left(error)) =>
Success(HealthcheckResult(description, mapErrorToError(error)))
case Success(Right(result)) =>
Success(HealthcheckResult(description, mapResultToError(result)))
case Failure(t) =>
Success(HealthcheckResult(description, mapExceptionToError(t)))
}
def apply(): Task[HealthcheckResult] = {
f.map {
case Left(error) =>
HealthcheckResult(description, mapErrorToError(error))
case Right(result) =>
HealthcheckResult(description, mapResultToError(result))
}.onErrorHandle(t => HealthcheckResult(description, mapExceptionToError(t)))
}
}
20 changes: 20 additions & 0 deletions src/main/scala/io/iohk/ethereum/jsonrpc/AkkaTaskOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.iohk.ethereum.jsonrpc

import akka.actor.{Actor, ActorRef}
import akka.pattern.ask
import akka.util.Timeout
import monix.eval.Task

import scala.reflect.ClassTag

object AkkaTaskOps {
implicit class TaskActorOps(val to: ActorRef) extends AnyVal {

def askFor[A](
message: Any
)(implicit timeout: Timeout, classTag: ClassTag[A], sender: ActorRef = Actor.noSender): Task[A] =
Task
.deferFuture((to ? message).mapTo[A])
.timeout(timeout.duration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
import io.iohk.ethereum.crypto.ECDSASignature
import io.iohk.ethereum.domain.Blockchain
import io.iohk.ethereum.utils.Logger
import monix.execution.Scheduler.Implicits.global

import scala.concurrent.Future
import monix.eval.Task

class CheckpointingService(
blockchain: Blockchain,
Expand All @@ -21,12 +19,12 @@ class CheckpointingService(
lazy val bestBlockNum = blockchain.getBestBlockNumber()
lazy val blockToReturnNum = bestBlockNum - bestBlockNum % req.checkpointingInterval

Future {
Task {
blockchain.getBlockByNumber(blockToReturnNum)
}.flatMap {
case Some(b) =>
val resp = GetLatestBlockResponse(b.hash, b.number)
Future.successful(Right(resp))
Task.now(Right(resp))

case None =>
log.error(
Expand All @@ -37,7 +35,7 @@ class CheckpointingService(
}
}

def pushCheckpoint(req: PushCheckpointRequest): ServiceResponse[PushCheckpointResponse] = Future {
def pushCheckpoint(req: PushCheckpointRequest): ServiceResponse[PushCheckpointResponse] = Task {
syncController ! NewCheckpoint(req.hash, req.signatures)
Right(PushCheckpointResponse())
}
Expand Down
31 changes: 13 additions & 18 deletions src/main/scala/io/iohk/ethereum/jsonrpc/DebugService.scala
Original file line number Diff line number Diff line change
@@ -1,49 +1,44 @@
package io.iohk.ethereum.jsonrpc

import akka.actor.ActorRef
import akka.pattern._
import akka.util.Timeout
import io.iohk.ethereum.jsonrpc.AkkaTaskOps._
import io.iohk.ethereum.jsonrpc.DebugService.{ListPeersInfoRequest, ListPeersInfoResponse}
import io.iohk.ethereum.network.EtcPeerManagerActor.{PeerInfo, PeerInfoResponse}
import io.iohk.ethereum.network.PeerManagerActor.Peers
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerActor, PeerId, PeerManagerActor}
import monix.eval.Task

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._

object DebugService {

case class ListPeersInfoRequest()
case class ListPeersInfoResponse(peers: List[PeerInfo])

}

class DebugService(peerManager: ActorRef, etcPeerManager: ActorRef) {

def listPeersInfo(getPeersInfoRequest: ListPeersInfoRequest): ServiceResponse[ListPeersInfoResponse] = {
val result = for {
for {
ids <- getPeerIds
peers <- Future.traverse(ids)(getPeerInfo)
} yield ListPeersInfoResponse(peers.flatten)

result.map(Right(_))
peers <- Task.traverse(ids)(getPeerInfo)
} yield Right(ListPeersInfoResponse(peers.flatten))
}

private def getPeerIds: Future[List[PeerId]] = {
private def getPeerIds: Task[List[PeerId]] = {
implicit val timeout: Timeout = Timeout(5.seconds)

(peerManager ? PeerManagerActor.GetPeers)
.mapTo[Peers]
.recover { case _ => Peers(Map.empty[Peer, PeerActor.Status]) }
peerManager
.askFor[Peers](PeerManagerActor.GetPeers)
.onErrorRecover { case _ => Peers(Map.empty[Peer, PeerActor.Status]) }
.map(_.peers.keySet.map(_.id).toList)
}

private def getPeerInfo(peer: PeerId): Future[Option[PeerInfo]] = {
private def getPeerInfo(peer: PeerId): Task[Option[PeerInfo]] = {
implicit val timeout: Timeout = Timeout(5.seconds)

(etcPeerManager ? EtcPeerManagerActor.PeerInfoRequest(peer))
.mapTo[PeerInfoResponse]
.collect { case PeerInfoResponse(info) => info }
etcPeerManager
.askFor[PeerInfoResponse](EtcPeerManagerActor.PeerInfoRequest(peer))
.map(resp => resp.peerInfo)
}
}
Loading