Skip to content

Commit 3f69e79

Browse files
committed
Added Enqueue route, using a priority mailbox
1 parent 769c2b6 commit 3f69e79

File tree

6 files changed

+57
-10
lines changed

6 files changed

+57
-10
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
es-priority-mailbox {
2+
mailbox-type = "de.upb.cs.swt.delphi.webapi.ElasticPriorityMailbox"
3+
}
4+
5+
akka.actor.deployment {
6+
/espriomailboxactor {
7+
mailbox = es-priority-mailbox
8+
}
9+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{ActorSystem, ExtendedActorSystem}
4+
import akka.event.{BusLogging, LoggingAdapter}
5+
6+
trait AppLogging {
7+
def log(implicit system: ActorSystem): LoggingAdapter = new BusLogging(system.eventStream, this.getClass.getName, this.getClass, system.asInstanceOf[ExtendedActorSystem].logFilter)
8+
}

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package de.upb.cs.swt.delphi.webapi
22

3-
import akka.actor.{Actor, ActorLogging, PoisonPill, Props, ReceiveTimeout}
3+
import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout}
44
import com.sksamuel.elastic4s.IndexAndType
55
import com.sksamuel.elastic4s.http.ElasticDsl._
66
import com.sksamuel.elastic4s.http.HttpClient

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package de.upb.cs.swt.delphi.webapi
22

33
import akka.actor.{Actor, ActorLogging, Props}
44
import com.sksamuel.elastic4s.http.ElasticDsl._
5-
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
66
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
77

88
class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{
@@ -13,16 +13,21 @@ class ElasticActorManager(configuration: Configuration) extends Actor with Actor
1313
override def postStop(): Unit = log.info("Actor manager shut down")
1414

1515
override def receive = {
16-
case Retrieve(id) => {
17-
log.info("Creating actor to search for entry {}", id)
18-
val retrieveActor = context.actorOf(ElasticActor.props(configuration))
19-
retrieveActor forward GetSource(id, index)
20-
}
16+
case Retrieve(id) => getSource(id)
17+
case Enqueue(id) => getSource(id)
18+
}
19+
20+
private def getSource(id: String) = {
21+
log.info("Creating actor to search for entry {}", id)
22+
val retrieveActor = context.actorOf(ElasticActor.props(configuration))
23+
retrieveActor forward GetSource(id, index)
2124
}
2225
}
2326

2427
object ElasticActorManager{
2528
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
29+
.withMailbox("es-priority-mailbox")
2630

2731
final case class Retrieve(id: String)
32+
final case class Enqueue(id: String)
2833
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.ActorSystem
4+
import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox}
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
6+
import com.typesafe.config.Config
7+
8+
class ElasticPriorityMailbox (settings: ActorSystem.Settings, config: Config)
9+
extends UnboundedStablePriorityMailbox(
10+
PriorityGenerator{
11+
case Retrieve(_) => 5
12+
case Enqueue(_) => 1
13+
case _ => 2
14+
})

src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import akka.http.scaladsl.server.HttpApp
77
import akka.pattern.ask
88
import akka.util.Timeout
99
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
10-
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve
10+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
1111
import spray.json._
1212

1313
/**
1414
* Web server configuration for Delphi web API.
1515
*/
16-
object Server extends HttpApp with JsonSupport {
16+
object Server extends HttpApp with JsonSupport with AppLogging {
1717

1818
private val configuration = new Configuration()
1919
private val system = ActorSystem("delphi-webapi")
@@ -24,7 +24,8 @@ object Server extends HttpApp with JsonSupport {
2424
path("version") { version } ~
2525
path("features") { features } ~
2626
pathPrefix("search" / Remaining) { query => search(query) } ~
27-
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) }
27+
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
28+
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
2829

2930

3031
private def version = {
@@ -51,6 +52,16 @@ object Server extends HttpApp with JsonSupport {
5152
}
5253
}
5354

55+
def enqueue(identifier: String) = {
56+
get {
57+
pass { //TODO: Require authorization here
58+
complete(
59+
(actorManager ? Enqueue(identifier)).mapTo[String]
60+
)
61+
}
62+
}
63+
}
64+
5465
def search(query: String) = {
5566
get {
5667
complete {

0 commit comments

Comments
 (0)