Skip to content

Commit 769c2b6

Browse files
committed
Refactored Retrieve to use Actors
1 parent 50de868 commit 769c2b6

File tree

4 files changed

+80
-21
lines changed

4 files changed

+80
-21
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{Actor, ActorLogging, PoisonPill, Props, ReceiveTimeout}
4+
import com.sksamuel.elastic4s.IndexAndType
5+
import com.sksamuel.elastic4s.http.ElasticDsl._
6+
import com.sksamuel.elastic4s.http.HttpClient
7+
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
8+
9+
import scala.concurrent.duration._
10+
11+
class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{
12+
13+
val client = HttpClient(configuration.elasticsearchClientUri)
14+
15+
override def preStart(): Unit = log.info("Search actor started")
16+
override def postStop(): Unit = log.info("Search actor shut down")
17+
context.setReceiveTimeout(2 seconds)
18+
19+
override def receive = {
20+
case GetSource(id, index) => {
21+
log.info("Executing get on entry {}", id)
22+
def source = client.execute{
23+
get(id).from(index)
24+
}.await match {
25+
case Right(res) => res.body.get
26+
case Left(_) => Option.empty
27+
}
28+
sender().tell(source, context.self)
29+
context.stop(self)
30+
}
31+
case ReceiveTimeout => context.stop(self)
32+
}
33+
}
34+
35+
object ElasticActor{
36+
def props(configuration: Configuration) : Props = Props(new ElasticActor(configuration))
37+
38+
final case class GetSource(id: String, index: IndexAndType)
39+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{Actor, ActorLogging, Props}
4+
import com.sksamuel.elastic4s.http.ElasticDsl._
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve
6+
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
7+
8+
class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{
9+
10+
private val index = "delphi" / "project"
11+
12+
override def preStart(): Unit = log.info("Actor manager started")
13+
override def postStop(): Unit = log.info("Actor manager shut down")
14+
15+
override def receive = {
16+
case Retrieve(id) => {
17+
log.info("Creating actor to search for entry {}", id)
18+
val retrieveActor = context.actorOf(ElasticActor.props(configuration))
19+
retrieveActor forward GetSource(id, index)
20+
}
21+
}
22+
}
23+
24+
object ElasticActorManager{
25+
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
26+
27+
final case class Retrieve(id: String)
28+
}

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
package de.upb.cs.swt.delphi.webapi
22

3+
import java.util.concurrent.TimeUnit
4+
5+
import akka.actor.ActorSystem
36
import akka.http.scaladsl.server.HttpApp
7+
import akka.pattern.ask
8+
import akka.util.Timeout
49
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
10+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve
511
import spray.json._
612

713
/**
814
* Web server configuration for Delphi web API.
915
*/
1016
object Server extends HttpApp with JsonSupport {
1117

18+
private val configuration = new Configuration()
19+
private val system = ActorSystem("delphi-webapi")
20+
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
21+
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
22+
1223
override def routes =
1324
path("version") { version } ~
1425
path("features") { features } ~
@@ -35,7 +46,7 @@ object Server extends HttpApp with JsonSupport {
3546
def retrieve(identifier: String) = {
3647
get {
3748
complete(
38-
ElasticClient.getSource(identifier)
49+
(actorManager ? Retrieve(identifier)).mapTo[String]
3950
)
4051
}
4152
}
@@ -51,6 +62,7 @@ object Server extends HttpApp with JsonSupport {
5162
def main(args: Array[String]): Unit = {
5263
val configuration = new Configuration()
5364
Server.startServer(configuration.bindHost, configuration.bindPort)
65+
system.terminate()
5466
}
5567

5668

0 commit comments

Comments
 (0)