Skip to content

Commit cf27856

Browse files
author
Dmitry Voronov
committed
[ETCM-266]-replaced-rate-limiter-built-on-twitter
1 parent 8eea376 commit cf27856

File tree

7 files changed

+156
-64
lines changed

7 files changed

+156
-64
lines changed

build.sbt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ lazy val node = {
122122
Dependencies.cats,
123123
Dependencies.monix,
124124
Dependencies.network,
125-
Dependencies.twitterUtilCollection,
126125
Dependencies.crypto,
127126
Dependencies.scopt,
128127
Dependencies.logging,

project/Dependencies.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ object Dependencies {
9494
"org.codehaus.janino" % "janino" % "3.1.2"
9595
)
9696

97-
val twitterUtilCollection = Seq("com.twitter" %% "util-collection" % "19.1.0")
98-
9997
val crypto = Seq("org.bouncycastle" % "bcprov-jdk15on" % "1.66")
10098

10199
val scopt = Seq("com.github.scopt" % "scopt_2.12" % "3.7.1")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.iohk.ethereum.db.cache
2+
3+
import java.util
4+
5+
/**
6+
* Simplest-possible implementation of insert-time based LRU
7+
*
8+
* @param maxElements - capacity of the inner map, between 0 and this value.
9+
* @param ttlMillis - entry invalidation interval
10+
* @tparam K - stands for the type of the keys
11+
*/
12+
class SimpleLRU[K](maxElements: Int, ttlMillis: Long) {
13+
14+
private[this] val inner = new util.LinkedHashMap[K, Long](maxElements, 0.75f, false) {
15+
override def removeEldestEntry(old: util.Map.Entry[K, Long]): Boolean = {
16+
size() > maxElements || tooOld(old.getValue)
17+
}
18+
}
19+
20+
/**
21+
* @param key - will be searching and added
22+
* @return
23+
* true - if there was such entry already
24+
* false - if not
25+
*/
26+
def checkAndRefreshEntry(key: K): Boolean = inner.synchronized {
27+
val existing = Option(inner.put(key, currentTime))
28+
existing.exists(!tooOld(_))
29+
}
30+
31+
// Override this to test
32+
protected def currentTime: Long = System.currentTimeMillis()
33+
34+
private[this] def tooOld(time: Long) = time + ttlMillis < currentTime
35+
36+
}

src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.json4s.native.Serialization
2525
import org.json4s.{DefaultFormats, JInt, native}
2626
import scala.concurrent.duration.{FiniteDuration, _}
2727

28-
trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
28+
trait JsonRpcHttpServer extends Json4sSupport with Logger {
2929
val jsonRpcController: JsonRpcBaseController
3030
val jsonRpcHealthChecker: JsonRpcHealthChecker
3131
val config: JsonRpcHttpServerConfig
@@ -54,37 +54,38 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
5454
}
5555
.result()
5656

57+
val rateLimit = new RateLimit(config.rateLimit)
58+
5759
val route: Route = cors(corsSettings) {
5860
(path("healthcheck") & pathEndOrSingleSlash & get) {
5961
handleHealthcheck()
6062
} ~ (path("buildinfo") & pathEndOrSingleSlash & get) {
6163
handleBuildInfo()
6264
} ~ (pathEndOrSingleSlash & post) {
63-
(extractClientIP & entity(as[JsonRpcRequest])) { (clientAddress, request) =>
64-
handleRequest(clientAddress, request)
65-
} ~ entity(as[Seq[JsonRpcRequest]]) { request =>
66-
handleBatchRequest(request)
65+
// TODO: maybe rate-limit this one too?
66+
entity(as[JsonRpcRequest]) {
67+
case statusReq if statusReq.method == FaucetJsonRpcController.Status =>
68+
handleRequest(statusReq)
69+
case jsonReq => rateLimit {
70+
handleRequest(jsonReq)
71+
}
72+
// TODO: separate paths for single and multiple requests
73+
// TODO: to prevent repeated body and json parsing
74+
} ~ entity(as[Seq[JsonRpcRequest]]) {
75+
case _ if config.rateLimit.enabled =>
76+
complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
77+
case reqSeq =>
78+
complete {
79+
Task
80+
.traverse(reqSeq)(request => jsonRpcController.handleRequest(request))
81+
.runToFuture
82+
}
6783
}
6884
}
6985
}
7086

71-
def handleRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
72-
//FIXME: FaucetJsonRpcController.Status should be part of a Healthcheck request or alike.
73-
// As a temporary solution, it is being excluded from the Rate Limit.
74-
if (config.rateLimit.enabled && request.method != FaucetJsonRpcController.Status) {
75-
handleRateLimitedRequest(clientAddress, request)
76-
} else complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
77-
}
78-
79-
def handleRateLimitedRequest(clientAddress: RemoteAddress, request: JsonRpcRequest): StandardRoute = {
80-
if (isBelowRateLimit(clientAddress))
81-
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
82-
else {
83-
log.warn(s"Request limit exceeded for ip ${clientAddress.toIP.getOrElse("unknown")}")
84-
complete(
85-
(StatusCodes.TooManyRequests, JsonRpcError.RateLimitError(config.rateLimit.minRequestInterval.toSeconds))
86-
)
87-
}
87+
def handleRequest(request: JsonRpcRequest): StandardRoute = {
88+
complete(handleResponse(jsonRpcController.handleRequest(request)).runToFuture)
8889
}
8990

9091
private def handleResponse(f: Task[JsonRpcResponse]): Task[(StatusCode, JsonRpcResponse)] = f map { jsonRpcResponse =>
@@ -128,15 +129,6 @@ trait JsonRpcHttpServer extends Json4sSupport with RateLimit with Logger {
128129
)
129130
}
130131

131-
private def handleBatchRequest(requests: Seq[JsonRpcRequest]) = {
132-
if (!config.rateLimit.enabled) {
133-
complete {
134-
Task
135-
.traverse(requests)(request => jsonRpcController.handleRequest(request))
136-
.runToFuture
137-
}
138-
} else complete(StatusCodes.MethodNotAllowed, JsonRpcError.MethodNotFound)
139-
}
140132
}
141133

142134
object JsonRpcHttpServer extends Logger {
@@ -160,12 +152,15 @@ object JsonRpcHttpServer extends Logger {
160152
}
161153

162154
trait RateLimitConfig {
155+
// TODO: Move the rateLimit.enabled setting upwards:
156+
// TODO: If we don't need to limit the request rate at all - we don't have to define the other settings
163157
val enabled: Boolean
164158
val minRequestInterval: FiniteDuration
165159
val latestTimestampCacheSize: Int
166160
}
167161

168162
object RateLimitConfig {
163+
// TODO: Use pureconfig
169164
def apply(rateLimitConfig: TypesafeConfig): RateLimitConfig =
170165
new RateLimitConfig {
171166
override val enabled: Boolean = rateLimitConfig.getBoolean("enabled")
Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,42 @@
11
package io.iohk.ethereum.jsonrpc.server.http
22

3-
import java.time.Clock
4-
5-
import akka.http.scaladsl.model.RemoteAddress
6-
import com.twitter.util.LruMap
7-
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.JsonRpcHttpServerConfig
8-
9-
trait RateLimit {
10-
11-
val config: JsonRpcHttpServerConfig
12-
13-
val latestRequestTimestamps = new LruMap[RemoteAddress, Long](config.rateLimit.latestTimestampCacheSize)
14-
15-
val clock: Clock = Clock.systemUTC()
16-
17-
def isBelowRateLimit(clientAddress: RemoteAddress): Boolean = {
18-
val timeMillis = clock.instant().toEpochMilli
19-
val latestRequestTimestamp = latestRequestTimestamps.getOrElse(clientAddress, 0L)
3+
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
4+
import akka.http.scaladsl.server.{Directive0, Route}
5+
import io.iohk.ethereum.db.cache.SimpleLRU
6+
import io.iohk.ethereum.jsonrpc.server.http.JsonRpcHttpServer.RateLimitConfig
7+
import akka.http.scaladsl.server.Directives._
8+
import io.iohk.ethereum.jsonrpc.JsonRpcError
9+
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
10+
import io.iohk.ethereum.jsonrpc.serialization.JsonSerializers
11+
import org.json4s.{DefaultFormats, Formats, Serialization, native}
12+
13+
14+
class RateLimit(config: RateLimitConfig) extends Directive0 with Json4sSupport {
15+
16+
private implicit val serialization: Serialization = native.Serialization
17+
private implicit val formats: Formats = DefaultFormats + JsonSerializers.RpcErrorJsonSerializer
18+
19+
// It determines whether a request needs to be blocked
20+
// Such algebras prevent if-elseif-else boilerplate in the JsonRPCServer code
21+
val blockingAlgebra: (RemoteAddress => Boolean) = {
22+
if (config.enabled) {
23+
val lru = new SimpleLRU[RemoteAddress](
24+
config.latestTimestampCacheSize,
25+
config.minRequestInterval.toMillis
26+
)
27+
lru.checkAndRefreshEntry
28+
} else {
29+
_ => false
30+
}
31+
}
2032

21-
val response = latestRequestTimestamp + config.rateLimit.minRequestInterval.toMillis < timeMillis
22-
if (response) latestRequestTimestamps.put(clientAddress, timeMillis)
23-
response
33+
override def tapply(f: Unit => Route): Route = extractClientIP { ip =>
34+
if (blockingAlgebra(ip)) {
35+
val err = JsonRpcError.RateLimitError(config.minRequestInterval.toSeconds)
36+
complete((StatusCodes.TooManyRequests, err))
37+
} else {
38+
f.apply( () )
39+
}
2440
}
25-
}
41+
42+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.iohk.ethereum.db.cache
2+
3+
import org.scalatest.wordspec.AnyWordSpec
4+
5+
class SimpleLRUSpec extends AnyWordSpec {
6+
7+
var time = 0L
8+
9+
private object MockedLRU extends SimpleLRU[Int](10, 100) {
10+
override protected def currentTime: Long = SimpleLRUSpec.this.time
11+
}
12+
13+
"It" should {
14+
15+
"Respond false with all missing entries and preserve length" in {
16+
val results = (0 until 100).map { i => MockedLRU.checkAndRefreshEntry(i) }
17+
assert( results.forall( _ == false ) )
18+
}
19+
20+
"Drop the records according to maxElements" in {
21+
val existing = (90 until 100).map { i => MockedLRU.checkAndRefreshEntry(i) }
22+
assert( existing.forall( _ == true ) )
23+
24+
// maxElements guaranteed that we have no space for those
25+
val absent = (0 until 10).map { i => MockedLRU.checkAndRefreshEntry(i) }
26+
assert( absent.forall( _ == false ) )
27+
}
28+
29+
"Obsolete the records according to ttlMillis" in {
30+
this.time = 0L
31+
var results = (0 until 5).map { i => MockedLRU.checkAndRefreshEntry(i) }
32+
assert( results.forall( _ == true ) )
33+
34+
this.time = 50L
35+
results = (5 until 10).map { i => MockedLRU.checkAndRefreshEntry(i) }
36+
assert( results.forall( _ == true ) )
37+
38+
this.time = 150L
39+
results = (0 until 5).map { i => MockedLRU.checkAndRefreshEntry(i) }
40+
assert( results.forall( _ == false ) )
41+
42+
// those were added at 50ms and still valid
43+
results = (5 until 10).map { i => MockedLRU.checkAndRefreshEntry(i) }
44+
assert( results.forall( _ == true ) )
45+
46+
this.time = 300L
47+
results = (0 until 10).map { i => MockedLRU.checkAndRefreshEntry(i) }
48+
assert( results.forall( _ == false ) )
49+
}
50+
51+
}
52+
53+
}

src/test/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServerSpec.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
215215
status shouldEqual StatusCodes.TooManyRequests
216216
}
217217

218-
fakeClock.advanceTime(2 * serverConfigWithRateLimit.rateLimit.minRequestInterval.toMillis)
218+
Thread.sleep(30)
219219

220220
postRequest ~> Route.seal(mockJsonRpcHttpServerWithRateLimit.route) ~> check {
221221
status shouldEqual StatusCodes.OK
@@ -382,7 +382,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
382382

383383
val rateLimitConfig = new RateLimitConfig {
384384
override val enabled: Boolean = false
385-
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
385+
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
386386
override val latestTimestampCacheSize: Int = 1024
387387
}
388388

@@ -397,7 +397,7 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
397397

398398
val rateLimitEnabledConfig = new RateLimitConfig {
399399
override val enabled: Boolean = true
400-
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(5, TimeUnit.SECONDS)
400+
override val minRequestInterval: FiniteDuration = FiniteDuration.apply(20, TimeUnit.MILLISECONDS)
401401
override val latestTimestampCacheSize: Int = 1024
402402
}
403403

@@ -412,14 +412,12 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
412412

413413
val mockJsonRpcController = mock[JsonRpcController]
414414
val mockJsonRpcHealthChecker = mock[JsonRpcHealthChecker]
415-
val fakeClock = new FakeClock
416415

417416
val mockJsonRpcHttpServer = new FakeJsonRpcHttpServer(
418417
jsonRpcController = mockJsonRpcController,
419418
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
420419
config = serverConfig,
421420
cors = serverConfig.corsAllowedOrigins,
422-
testClock = fakeClock
423421
)
424422

425423
val corsAllowedOrigin = HttpOrigin("http://localhost:3333")
@@ -428,15 +426,13 @@ class JsonRpcHttpServerSpec extends AnyFlatSpec with Matchers with ScalatestRout
428426
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
429427
config = serverConfig,
430428
cors = HttpOriginMatcher(corsAllowedOrigin),
431-
testClock = fakeClock
432429
)
433430

434431
val mockJsonRpcHttpServerWithRateLimit = new FakeJsonRpcHttpServer(
435432
jsonRpcController = mockJsonRpcController,
436433
jsonRpcHealthChecker = mockJsonRpcHealthChecker,
437434
config = serverConfigWithRateLimit,
438435
cors = serverConfigWithRateLimit.corsAllowedOrigins,
439-
testClock = fakeClock
440436
)
441437
}
442438
}
@@ -468,13 +464,11 @@ class FakeJsonRpcHttpServer(
468464
val jsonRpcHealthChecker: JsonRpcHealthChecker,
469465
val config: JsonRpcHttpServerConfig,
470466
val cors: HttpOriginMatcher,
471-
val testClock: Clock
472467
)(implicit val actorSystem: ActorSystem)
473468
extends JsonRpcHttpServer
474469
with Logger {
475470
def run(): Unit = ()
476471
override def corsAllowedOrigins: HttpOriginMatcher = cors
477-
override val clock = testClock
478472
}
479473

480474
class FakeClock extends Clock {

0 commit comments

Comments
 (0)