Skip to content

Commit 6ce7899

Browse files
author
Johannes Duesing
committed
Merge branch 'develop' into newInstanceRegistry
2 parents e4bf21c + 1677b67 commit 6ce7899

File tree

6 files changed

+193
-50
lines changed

6 files changed

+193
-50
lines changed

build.sbt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,43 @@ version := "1.0.0-SNAPSHOT"
44

55
scalaVersion := "2.12.4"
66

7+
val akkaHttpVersion = "10.1.5"
8+
79
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
8-
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.11"
10+
11+
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
12+
libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion
913
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
10-
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1"
11-
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
12-
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
14+
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
15+
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
1316
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4"
14-
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
17+
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test"
18+
libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1"
1519

1620
val elastic4sVersion = "6.3.0"
1721
libraryDependencies ++= Seq(
1822
"com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion,
1923

24+
2025
// for the http client
2126
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion,
2227

2328
// if you want to use reactive streams
2429
"com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion,
2530

26-
// testing
27-
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test",
28-
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion % "test"
2931
)
3032

3133

3234
lazy val webapi = (project in file(".")).
35+
//https://www.scala-sbt.org/1.x/docs/Testing.html
36+
configs(IntegrationTest).
37+
settings(
38+
Defaults.itSettings,
39+
).
3340
enablePlugins(JavaAppPackaging).
3441
enablePlugins(DockerPlugin).
3542
enablePlugins(ScalastylePlugin).
36-
settings (
43+
settings(
3744
dockerBaseImage := "openjdk:jre-alpine"
3845
).
3946
enablePlugins(AshScriptPlugin).
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
2+
// Copyright (C) 2018 The Delphi Team.
3+
// See the LICENCE file distributed with this work for additional
4+
// information regarding copyright ownership.
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
18+
package de.upb.cs.swt.delphi.webapi
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.Http
22+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
23+
import akka.http.scaladsl.unmarshalling.Unmarshal
24+
import akka.stream.ActorMaterializer
25+
import com.sksamuel.elastic4s.RefreshPolicy
26+
import com.sksamuel.elastic4s.http.ElasticClient
27+
import com.sksamuel.elastic4s.http.ElasticDsl._
28+
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
29+
30+
import scala.concurrent.duration._
31+
import scala.concurrent.{Await, Future}
32+
import scala.util.{Failure, Success}
33+
34+
/**
35+
* @author Hariharan.
36+
*/
37+
class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll {
38+
implicit val system = ActorSystem()
39+
implicit val materializer = ActorMaterializer()(system)
40+
implicit val executionContext = system.dispatcher
41+
val configuration = new Configuration()
42+
val client = ElasticClient(configuration.elasticsearchClientUri)
43+
44+
override def beforeAll(): Unit = {
45+
client.execute {
46+
indexInto("delphi" / "project").fields(
47+
"name" -> "test:elastic-actor-test:1.0"
48+
).refresh(RefreshPolicy.IMMEDIATE)
49+
}.await
50+
}
51+
52+
53+
override def afterAll(): Unit = {
54+
client.execute {
55+
deleteByQuery("delphi", "project", matchQuery("name", "test:elastic-actor-test:1.0"))
56+
}.await
57+
client.close()
58+
system.terminate()
59+
60+
}
61+
62+
63+
"Version no.." should
64+
"match version from build.sbt" in {
65+
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/version"))
66+
res.onComplete {
67+
case Success(ver) => {
68+
assert(ver.status.isSuccess());
69+
val res2Str: Future[String] = Unmarshal(ver.entity).to[String]
70+
res2Str.onComplete {
71+
case Success(value) => {
72+
assert(value.equals(BuildInfo.version))
73+
}
74+
case Failure(e) => {
75+
assertThrows(e);
76+
}
77+
}
78+
}
79+
case Failure(e) => {
80+
assertThrows(e);
81+
}
82+
}
83+
Await.result(res, 2.seconds)
84+
}
85+
86+
"Retrive endpoint" should
87+
"get test:elastic-actor-test:1.0 artifact" in {
88+
val mavenId = "test:elastic-actor-test:1.0"
89+
val url = s"http://localhost:8080/retrieve/${mavenId}"
90+
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url))
91+
res.onComplete {
92+
case Success(data) => {
93+
assert(data.status.isSuccess())
94+
val res2Str: Future[String] = Unmarshal(data.entity).to[String]
95+
res2Str.onComplete {
96+
case Success(value) => {
97+
assert(value.contains(mavenId))
98+
}
99+
case Failure(e) => {
100+
assertThrows(e);
101+
}
102+
}
103+
}
104+
case Failure(exception) => {
105+
assertThrows(exception)
106+
}
107+
}
108+
Await.result(res, 2.seconds)
109+
}
110+
111+
}

src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,4 @@ object InstanceEnums {
8282
val Failed : Value = Value("Failed")
8383
val NotReachable : Value = Value("NotReachable")
8484
}
85-
}
85+
}
Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package de.upb.cs.swt.delphi.webapi
2-
32
import akka.actor.{Actor, ActorLogging, Props}
43
import com.sksamuel.elastic4s.IndexAndType
54
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess}
65
import com.sksamuel.elastic4s.http.ElasticDsl._
76
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
87

8+
import spray.json._
99
import scala.concurrent.ExecutionContext
1010
import scala.concurrent.duration._
1111

12-
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{
12+
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging {
1313

1414
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
1515
val client = ElasticClient(configuration.elasticsearchClientUri)
1616

1717
override def preStart(): Unit = log.info("Search actor started")
18+
1819
override def postStop(): Unit = log.info("Search actor shut down")
20+
1921
context.setReceiveTimeout(2 seconds)
2022

2123
override def receive = {
@@ -25,22 +27,30 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac
2527

2628
private def getSource(id: String) = {
2729
log.info("Executing get on entry {}", id)
28-
def queryResponse = client.execute{
30+
val searchByName = searchWithType(index) query must(
31+
matchQuery("name", id)
32+
)
33+
log.info(s"Query {}",client.show(searchByName))
34+
def queryResponse = client.execute {
2935
log.info(s"Got retrieve request for $id.")
30-
searchWithType(index) query must (
31-
matchQuery("name", s"http://repo1.maven.org/maven2/:$id")
32-
)
36+
searchByName
3337
}.await
3438

3539
val source = queryResponse match {
36-
case results: RequestSuccess[_] => results.body.get
40+
case results: RequestSuccess[_] => {
41+
val resObj = results.body.get.parseJson.asJsObject
42+
val hitsObj=resObj.fields.getOrElse("hits", JsObject.empty).asJsObject
43+
val hitsArr=hitsObj.fields.getOrElse("hits",JsArray.empty).asInstanceOf[JsArray]
44+
val source=hitsArr.elements.map(m=>m.asJsObject.fields.get("_source"))
45+
source.head.getOrElse(JsObject.empty).toString()
46+
}
3747
case failure: RequestFailure => Option.empty
3848
}
3949
sender().tell(source, context.self)
4050
}
4151
}
4252

43-
object ElasticActor{
44-
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
53+
object ElasticActor {
54+
def props(configuration: Configuration, index: IndexAndType): Props = Props(new ElasticActor(configuration, index))
4555
.withMailbox("es-priority-mailbox")
4656
}

src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import akka.http.scaladsl.server.HttpApp
99
import akka.pattern.ask
1010
import akka.stream.ActorMaterializer
1111
import akka.util.Timeout
12-
import com.sksamuel.elastic4s.http.ElasticClient
13-
import com.sksamuel.elastic4s.http.ElasticDsl._
1412
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
1513
import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry
1614
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
@@ -33,13 +31,16 @@ object Server extends HttpApp with JsonSupport with AppLogging {
3331
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
3432
implicit val materializer = ActorMaterializer()
3533

36-
3734
override def routes =
38-
path("version") { version } ~
39-
path("features") { features } ~
40-
pathPrefix("search" / Remaining) { query => search(query) } ~
41-
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
42-
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
35+
path("version") {
36+
version
37+
} ~
38+
path("features") {
39+
features
40+
} ~
41+
pathPrefix("search" / Remaining) { query => search(query) } ~
42+
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
43+
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
4344

4445

4546
private def version = {
@@ -60,11 +61,11 @@ object Server extends HttpApp with JsonSupport with AppLogging {
6061

6162
def retrieve(identifier: String) = {
6263
get {
63-
pass { //TODO: Require authentication here
64+
pass { //TODO: Require authentication here
6465
complete(
6566
(actorManager ? Retrieve(identifier)).mapTo[String]
6667
)
67-
} ~ extractClientIP{ ip =>
68+
} ~ extractClientIP { ip =>
6869
complete(
6970
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
7071
)
@@ -74,7 +75,7 @@ object Server extends HttpApp with JsonSupport with AppLogging {
7475

7576
def enqueue(identifier: String) = {
7677
get {
77-
pass { //TODO: Require authorization here
78+
pass { //TODO: Require authorization here
7879
complete(
7980
(actorManager ? Enqueue(identifier)).mapTo[String]
8081
)
@@ -91,26 +92,7 @@ object Server extends HttpApp with JsonSupport with AppLogging {
9192
}
9293

9394
def main(args: Array[String]): Unit = {
94-
95-
implicit val ec : ExecutionContext = system.dispatcher
96-
lazy val client = ElasticClient(configuration.elasticsearchClientUri)
97-
98-
val f = (client.execute {
99-
nodeInfo()
100-
} map { i => {
101-
if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(true, configuration)
102-
Success(configuration)
103-
}
104-
} recover { case e => {
105-
if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(false, configuration)
106-
Failure(e)
107-
}
108-
}).andThen {
109-
case _ => client.close()
110-
}
111-
112-
Await.ready(f, Duration.Inf)
113-
95+
StartupCheck.check(configuration)
11496
Server.startServer(configuration.bindHost, configuration.bindPort)
11597
InstanceRegistry.handleInstanceStop(configuration)
11698
system.terminate()
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.ActorSystem
4+
import com.sksamuel.elastic4s.http.ElasticClient
5+
import com.sksamuel.elastic4s.http.ElasticDsl._
6+
import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry
7+
import scala.concurrent.duration.Duration
8+
import scala.concurrent.{Await, ExecutionContext}
9+
import scala.util.{Failure, Success, Try}
10+
11+
object StartupCheck extends AppLogging {
12+
def check(configuration: Configuration)(implicit system: ActorSystem): Try[Configuration] = {
13+
log.warning("Performing Instance Registry checks")
14+
implicit val ec : ExecutionContext = system.dispatcher
15+
lazy val client = ElasticClient(configuration.elasticsearchClientUri)
16+
17+
val f = (client.execute {
18+
nodeInfo()
19+
} map { i => {
20+
InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration)
21+
Success(configuration)
22+
}
23+
} recover { case e =>
24+
InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration)
25+
Failure(e)
26+
27+
}).andThen {
28+
case _ => client.close()
29+
}
30+
31+
Await.result(f, Duration.Inf)
32+
}
33+
}

0 commit comments

Comments
 (0)