Skip to content

Commit 39b0374

Browse files
committed
Added Actor that limits request frequency by IP.
This actor forwards requests to another actor, and blocks IPs for a certain amount of time if they make too many requests within a given window. It is not currently functioning, as there is not registration feature in this version, but it should work with a minor modification to the retrieve route.
1 parent 3f69e79 commit 39b0374

File tree

5 files changed

+105
-5
lines changed

5 files changed

+105
-5
lines changed

src/main/resources/application.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,12 @@ akka.actor.deployment {
66
/espriomailboxactor {
77
mailbox = es-priority-mailbox
88
}
9+
}
10+
11+
akka {
12+
http {
13+
server {
14+
remote-address-header = on
15+
}
16+
}
917
}

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ object ElasticActorManager{
2828
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
2929
.withMailbox("es-priority-mailbox")
3030

31-
final case class Retrieve(id: String)
32-
final case class Enqueue(id: String)
31+
final case class Retrieve(id: String) extends ElasticMessage
32+
final case class Enqueue(id: String) extends ElasticMessage
3333
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
trait ElasticMessage
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
4+
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
5+
import akka.actor.Timers
6+
import akka.http.scaladsl.model.RemoteAddress
7+
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._
8+
9+
import scala.concurrent.duration._
10+
import scala.collection.mutable
11+
12+
//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given
13+
// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time
14+
class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers {
15+
16+
private val window = 1 second
17+
private val threshold = 10
18+
private val timeout = 2 hours
19+
20+
private var recentIPs: mutable.Map[String, Int] = mutable.Map()
21+
private var blockedIPs: mutable.Set[String] = mutable.Set()
22+
23+
override def preStart(): Unit = {
24+
log.info("Request limiter started")
25+
timers.startPeriodicTimer(ClearTimer, ClearLogs, window)
26+
}
27+
override def postStop(): Unit = log.info("Request limiter shut down")
28+
29+
override def receive = {
30+
case Validate(rawIp, message) => {
31+
val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown")
32+
//First, reject IPs marked as blocked
33+
if (blockedIPs.contains(ip)) {
34+
rejectRequest()
35+
} else {
36+
//Check if this IP has made any requests recently
37+
if (recentIPs.contains(ip)) {
38+
//If so, increment their counter and test if they have exceeded the request threshold
39+
recentIPs.update(ip, recentIPs(ip) + 1)
40+
if (recentIPs(ip) > threshold) {
41+
//If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period
42+
blockedIPs += ip
43+
log.info("Blocked IP {} due to exceeding request frequency threshold", ip)
44+
timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout)
45+
rejectRequest()
46+
} else {
47+
//Else, forward this message
48+
nextActor forward message
49+
}
50+
} else {
51+
//Else, register their request in the map and pass it to the next actor
52+
recentIPs += (ip -> 1)
53+
nextActor forward message
54+
}
55+
}
56+
}
57+
case ClearLogs =>
58+
recentIPs.clear()
59+
case Forgive(ip) => {
60+
blockedIPs -= ip
61+
log.info("Forgave IP {} after timeout", ip)
62+
}
63+
}
64+
65+
//Rejects requests from blocked IPs
66+
private def rejectRequest() =
67+
sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" +
68+
"As a result, you have been timed out.\n" +
69+
"Please wait a while or register an account with us to continue using this service."
70+
}
71+
72+
object ElasticRequestLimiter{
73+
def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor))
74+
75+
final case class Validate(rawIp: RemoteAddress, message: ElasticMessage)
76+
final case object ClearLogs
77+
final case class Forgive(ip: String)
78+
79+
final case object ClearTimer
80+
final case class ForgiveTimer(ip: String)
81+
}

src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import akka.pattern.ask
88
import akka.util.Timeout
99
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
1010
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
11+
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate
1112
import spray.json._
1213

1314
/**
@@ -18,6 +19,7 @@ object Server extends HttpApp with JsonSupport with AppLogging {
1819
private val configuration = new Configuration()
1920
private val system = ActorSystem("delphi-webapi")
2021
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
22+
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
2123
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
2224

2325
override def routes =
@@ -46,9 +48,15 @@ object Server extends HttpApp with JsonSupport with AppLogging {
4648

4749
def retrieve(identifier: String) = {
4850
get {
49-
complete(
50-
(actorManager ? Retrieve(identifier)).mapTo[String]
51-
)
51+
pass { //TODO: Require authentication here
52+
complete(
53+
(actorManager ? Retrieve(identifier)).mapTo[String]
54+
)
55+
} ~ extractClientIP{ ip =>
56+
complete(
57+
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
58+
)
59+
}
5260
}
5361
}
5462

0 commit comments

Comments
 (0)