Skip to content
26 changes: 20 additions & 6 deletions src/main/scala/io/iohk/ethereum/faucet/FaucetConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.ethereum.faucet

import com.typesafe.config.{ConfigFactory, Config}
import com.typesafe.config.{Config, ConfigFactory}
import io.iohk.ethereum.domain.Address

import scala.concurrent.duration.{FiniteDuration, _}
Expand All @@ -11,36 +11,50 @@ trait FaucetConfigBuilder {
lazy val faucetConfig: FaucetConfig = FaucetConfig(rawConfig)
}

case class RpcClientConfig(
address: String,
timeout: FiniteDuration
)

object RpcClientConfig {
def apply(rpcClientConfig: Config): RpcClientConfig = {

RpcClientConfig(
address = rpcClientConfig.getString("rpc-address"),
timeout = rpcClientConfig.getDuration("timeout").toMillis.millis
)
}
}

case class FaucetConfig(
walletAddress: Address,
walletPassword: String,
txGasPrice: BigInt,
txGasLimit: BigInt,
txValue: BigInt,
rpcAddress: String,
rpcClient: RpcClientConfig,
keyStoreDir: String,
minRequestInterval: FiniteDuration,
handlerTimeout: FiniteDuration,
responseTimeout: FiniteDuration,
actorCommunicationMargin: FiniteDuration,
supervisor: SupervisorConfig,
shutdownTimeout: FiniteDuration
)

object FaucetConfig {
def apply(typesafeConfig: Config): FaucetConfig = {
val faucetConfig = typesafeConfig.getConfig("faucet")

FaucetConfig(
walletAddress = Address(faucetConfig.getString("wallet-address")),
walletPassword = faucetConfig.getString("wallet-password"),
txGasPrice = faucetConfig.getLong("tx-gas-price"),
txGasLimit = faucetConfig.getLong("tx-gas-limit"),
txValue = faucetConfig.getLong("tx-value"),
rpcAddress = faucetConfig.getString("rpc-client.rpc-address"),
rpcClient = RpcClientConfig(faucetConfig.getConfig("rpc-client")),
keyStoreDir = faucetConfig.getString("keystore-dir"),
minRequestInterval = faucetConfig.getDuration("min-request-interval").toMillis.millis,
handlerTimeout = faucetConfig.getDuration("handler-timeout").toMillis.millis,
responseTimeout = faucetConfig.getDuration("response-timeout").toMillis.millis,
actorCommunicationMargin = faucetConfig.getDuration("actor-communication-margin").toMillis.millis,
supervisor = SupervisorConfig(faucetConfig),
shutdownTimeout = faucetConfig.getDuration("shutdown-timeout").toMillis.millis
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object SupervisorConfig {

SupervisorConfig(
supervisorConfig.getDuration("min-backoff").toMillis.millis,
supervisorConfig.getDuration("man-backoff").toMillis.millis,
supervisorConfig.getDuration("max-backoff").toMillis.millis,
supervisorConfig.getDouble("random-factor"),
supervisorConfig.getDuration("auto-reset").toMillis.millis,
supervisorConfig.getInt("attempts"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ trait FaucetRpcServiceBuilder {
)

val walletRpcClient: WalletRpcClient =
new WalletRpcClient(faucetConfig.rpcAddress, () => sslContext("faucet.rpc-client"))
new WalletRpcClient(
faucetConfig.rpcClient.address,
faucetConfig.rpcClient.timeout,
() => sslContext("faucet.rpc-client")
)
val walletService = new WalletService(walletRpcClient, keyStore, faucetConfig)
val faucetSupervisor: FaucetSupervisor = new FaucetSupervisor(walletService, faucetConfig, shutdown)(system)
val faucetRpcService = new FaucetRpcService(faucetConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FaucetRpcService(config: FaucetConfig)(implicit system: ActorSystem)
with FaucetHandlerSelector
with Logger {

implicit lazy val actorTimeout: Timeout = Timeout(config.responseTimeout)
implicit lazy val actorTimeout: Timeout = Timeout(config.actorCommunicationMargin + config.rpcClient.timeout)

def sendFunds(sendFundsRequest: SendFundsRequest): ServiceResponse[SendFundsResponse] =
selectFaucetHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import javax.net.ssl.SSLContext
import monix.eval.Task

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

class WalletRpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
class WalletRpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
system: ActorSystem,
ec: ExecutionContext
) extends RpcClient(node, getSSLContext)
) extends RpcClient(node, timeout, getSSLContext)
with Logger {
import io.iohk.ethereum.jsonrpc.client.CommonJsonCodecs._

Expand Down
30 changes: 26 additions & 4 deletions src/main/scala/io/iohk/ethereum/jsonrpc/client/RpcClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import java.util.UUID

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
import akka.stream.StreamTcpException
import akka.stream.scaladsl.TcpIdleTimeoutException
import io.circe.generic.auto._
import io.circe.parser.parse
import io.circe.syntax._
Expand All @@ -18,8 +21,9 @@ import javax.net.ssl.SSLContext
import monix.eval.Task

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLContext])(implicit
abstract class RpcClient(node: Uri, timeout: Duration, getSSLContext: () => Either[SSLError, SSLContext])(implicit
system: ActorSystem,
ec: ExecutionContext
) extends Logger {
Expand All @@ -32,6 +36,12 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon
Http().defaultClientHttpsContext
}

lazy val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withIdleTimeout(timeout)
)

protected def doRequest[T: Decoder](method: String, args: Seq[Json]): RpcResponse[T] = {
doJsonRequest(method, args).map(_.flatMap(getResult[T]))
}
Expand Down Expand Up @@ -60,11 +70,21 @@ abstract class RpcClient(node: Uri, getSSLContext: () => Either[SSLError, SSLCon

Task
.deferFuture(for {
response <- Http().singleRequest(request, connectionContext)
response <- Http().singleRequest(request, connectionContext, connectionPoolSettings)
data <- Unmarshal(response.entity).to[String]
} yield parse(data).left.map(e => RpcClientError(e.message)))
} yield parse(data).left.map(e => ParserError(e.message)))
.onErrorHandle { ex: Throwable =>
Left(RpcClientError(s"RPC request failed: ${exceptionToString(ex)}"))
ex match {
case _: TcpIdleTimeoutException =>
log.error("RPC request", ex)
Left(ConnectionError(s"RPC request timeout"))
case _: StreamTcpException =>
log.error("Connection not established", ex)
Left(ConnectionError(s"Connection not established"))
case _ =>
log.error("RPC request failed", ex)
Left(RpcClientError("RPC request failed"))
}
}
}

Expand Down Expand Up @@ -97,5 +117,7 @@ object RpcClient {

case class ParserError(msg: String) extends RpcError

case class ConnectionError(msg: String) extends RpcError

case class RpcClientError(msg: String) extends RpcError
}
5 changes: 3 additions & 2 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,18 @@ faucet {
rpc-client {
rpc-address = "http://127.0.0.1:8546/"
certificate = null
timeout = 2.seconds
}

min-request-interval = 1.minute

handler-timeout = 2.seconds

response-timeout = 2.seconds
actor-communication-margin = 1.seconds

supervisor {
min-backoff = 3.seconds
man-backoff = 30.seconds
max-backoff = 30.seconds
random-factor = 0.2
auto-reset = 10.seconds
attempts = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.iohk.ethereum.faucet.FaucetHandler.FaucetHandlerResponse.{
}
import io.iohk.ethereum.faucet.FaucetStatus.WalletAvailable
import io.iohk.ethereum.faucet.jsonrpc.FaucetDomain.{SendFundsRequest, StatusRequest}
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
import io.iohk.ethereum.jsonrpc.JsonRpcError
import io.iohk.ethereum.testing.ActorsTesting.simpleAutoPilot
import io.iohk.ethereum.{NormalPatience, WithActorSystemShutDown}
Expand Down Expand Up @@ -133,11 +133,11 @@ class FaucetRpcServiceSpec
txGasPrice = 10,
txGasLimit = 20,
txValue = 1,
rpcAddress = "",
rpcClient = RpcClientConfig(address = "", timeout = 10.seconds),
keyStoreDir = "",
minRequestInterval = 10.seconds,
handlerTimeout = 10.seconds,
responseTimeout = 10.seconds,
actorCommunicationMargin = 10.seconds,
supervisor = mock[SupervisorConfig],
shutdownTimeout = 15.seconds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import java.security.SecureRandom
import akka.util.ByteString
import io.iohk.ethereum.crypto._
import io.iohk.ethereum.domain.{Address, Transaction}
import io.iohk.ethereum.faucet.{FaucetConfig, SupervisorConfig}
import io.iohk.ethereum.faucet.{FaucetConfig, RpcClientConfig, SupervisorConfig}
import io.iohk.ethereum.jsonrpc.client.RpcClient.ConnectionError
import io.iohk.ethereum.keystore.KeyStore.DecryptionFailed
import io.iohk.ethereum.keystore.{KeyStore, Wallet}
import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions.SignedTransactionEnc
Expand All @@ -21,7 +22,7 @@ import scala.concurrent.duration._

class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {

"Wallet Service" should "send a transaction" in new TestSetup {
"Wallet Service" should "send a transaction successfully when getNonce and sendTransaction successfully" in new TestSetup {

val receivingAddress = Address("0x99")
val currentNonce = 2
Expand All @@ -44,6 +45,17 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {

}

it should "failure the transaction when get timeout of getNonce" in new TestSetup {

val timeout = ConnectionError("timeout")
(walletRpcClient.getNonce _).expects(config.walletAddress).returning(Task.pure(Left(timeout)))

val res = walletService.sendFunds(wallet, Address("0x99")).runSyncUnsafe()

res shouldEqual Left(timeout)

}

it should "get wallet successful" in new TestSetup {
(mockKeyStore.unlockAccount _).expects(config.walletAddress, config.walletPassword).returning(Right(wallet))

Expand Down Expand Up @@ -76,11 +88,11 @@ class WalletServiceSpec extends AnyFlatSpec with Matchers with MockFactory {
txGasPrice = 10,
txGasLimit = 20,
txValue = 1,
rpcAddress = "",
rpcClient = RpcClientConfig("", timeout = 10.seconds),
keyStoreDir = "",
minRequestInterval = 10.seconds,
handlerTimeout = 10.seconds,
responseTimeout = 10.seconds,
actorCommunicationMargin = 10.seconds,
supervisor = mock[SupervisorConfig],
shutdownTimeout = 15.seconds
)
Expand Down
7 changes: 5 additions & 2 deletions src/universal/conf/faucet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ faucet {
# null value indicates HTTPS is not being used
# password-file = "tls/password"
#}

# Response time-out from rpc client resolve
timeout = 3.seconds
}

# How often can a single IP address send a request
Expand All @@ -49,12 +52,12 @@ faucet {
handler-timeout = 1.seconds

# Response time-out from actor resolve
response-timeout = 3.seconds
actor-communication-margin = 1.seconds

# Supervisor with BackoffSupervisor pattern
supervisor {
min-backoff = 3.seconds
man-backoff = 30.seconds
max-backoff = 30.seconds
random-factor = 0.2
auto-reset = 10.seconds
attempts = 4
Expand Down