Skip to content

Commit cdad67c

Browse files
authored
Merge branch 'develop' into feature/instanceregistry
2 parents a9f8b2e + ee037e9 commit cdad67c

File tree

4 files changed

+158
-27
lines changed

4 files changed

+158
-27
lines changed

build.sbt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,43 @@ version := "1.0.0-SNAPSHOT"
44

55
scalaVersion := "2.12.4"
66

7+
val akkaHttpVersion = "10.1.5"
8+
79
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
8-
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.11"
10+
11+
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
12+
libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion
913
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
10-
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1"
11-
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
12-
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
14+
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
15+
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
1316
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4"
14-
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
17+
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test"
18+
libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1"
1519

1620
val elastic4sVersion = "6.3.0"
1721
libraryDependencies ++= Seq(
1822
"com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion,
1923

24+
2025
// for the http client
2126
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion,
2227

2328
// if you want to use reactive streams
2429
"com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion,
2530

26-
// testing
27-
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test",
28-
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion % "test"
2931
)
3032

3133

3234
lazy val webapi = (project in file(".")).
35+
//https://www.scala-sbt.org/1.x/docs/Testing.html
36+
configs(IntegrationTest).
37+
settings(
38+
Defaults.itSettings,
39+
).
3340
enablePlugins(JavaAppPackaging).
3441
enablePlugins(DockerPlugin).
3542
enablePlugins(ScalastylePlugin).
36-
settings (
43+
settings(
3744
dockerBaseImage := "openjdk:jre-alpine"
3845
).
3946
enablePlugins(AshScriptPlugin).
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
2+
// Copyright (C) 2018 The Delphi Team.
3+
// See the LICENCE file distributed with this work for additional
4+
// information regarding copyright ownership.
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
18+
package de.upb.cs.swt.delphi.webapi
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.Http
22+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
23+
import akka.http.scaladsl.unmarshalling.Unmarshal
24+
import akka.stream.ActorMaterializer
25+
import com.sksamuel.elastic4s.RefreshPolicy
26+
import com.sksamuel.elastic4s.http.ElasticClient
27+
import com.sksamuel.elastic4s.http.ElasticDsl._
28+
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
29+
30+
import scala.concurrent.duration._
31+
import scala.concurrent.{Await, Future}
32+
import scala.util.{Failure, Success}
33+
34+
/**
35+
* @author Hariharan.
36+
*/
37+
class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll {
38+
implicit val system = ActorSystem()
39+
implicit val materializer = ActorMaterializer()(system)
40+
implicit val executionContext = system.dispatcher
41+
val configuration = new Configuration()
42+
val client = ElasticClient(configuration.elasticsearchClientUri)
43+
44+
override def beforeAll(): Unit = {
45+
client.execute {
46+
indexInto("delphi" / "project").fields(
47+
"name" -> "test:elastic-actor-test:1.0"
48+
).refresh(RefreshPolicy.IMMEDIATE)
49+
}.await
50+
}
51+
52+
53+
override def afterAll(): Unit = {
54+
client.execute {
55+
deleteByQuery("delphi", "project", matchQuery("name", "test:elastic-actor-test:1.0"))
56+
}.await
57+
client.close()
58+
system.terminate()
59+
60+
}
61+
62+
63+
"Version no.." should
64+
"match version from build.sbt" in {
65+
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/version"))
66+
res.onComplete {
67+
case Success(ver) => {
68+
assert(ver.status.isSuccess());
69+
val res2Str: Future[String] = Unmarshal(ver.entity).to[String]
70+
res2Str.onComplete {
71+
case Success(value) => {
72+
assert(value.equals(BuildInfo.version))
73+
}
74+
case Failure(e) => {
75+
assertThrows(e);
76+
}
77+
}
78+
}
79+
case Failure(e) => {
80+
assertThrows(e);
81+
}
82+
}
83+
Await.result(res, 2.seconds)
84+
}
85+
86+
"Retrive endpoint" should
87+
"get test:elastic-actor-test:1.0 artifact" in {
88+
val mavenId = "test:elastic-actor-test:1.0"
89+
val url = s"http://localhost:8080/retrieve/${mavenId}"
90+
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url))
91+
res.onComplete {
92+
case Success(data) => {
93+
assert(data.status.isSuccess())
94+
val res2Str: Future[String] = Unmarshal(data.entity).to[String]
95+
res2Str.onComplete {
96+
case Success(value) => {
97+
assert(value.contains(mavenId))
98+
}
99+
case Failure(e) => {
100+
assertThrows(e);
101+
}
102+
}
103+
}
104+
case Failure(exception) => {
105+
assertThrows(exception)
106+
}
107+
}
108+
Await.result(res, 2.seconds)
109+
}
110+
111+
}
Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package de.upb.cs.swt.delphi.webapi
2-
32
import akka.actor.{Actor, ActorLogging, Props}
43
import com.sksamuel.elastic4s.IndexAndType
54
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess}
65
import com.sksamuel.elastic4s.http.ElasticDsl._
76
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
87

8+
import spray.json._
99
import scala.concurrent.ExecutionContext
1010
import scala.concurrent.duration._
1111

12-
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{
12+
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging {
1313

1414
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
1515
val client = ElasticClient(configuration.elasticsearchClientUri)
1616

1717
override def preStart(): Unit = log.info("Search actor started")
18+
1819
override def postStop(): Unit = log.info("Search actor shut down")
20+
1921
context.setReceiveTimeout(2 seconds)
2022

2123
override def receive = {
@@ -25,22 +27,30 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac
2527

2628
private def getSource(id: String) = {
2729
log.info("Executing get on entry {}", id)
28-
def queryResponse = client.execute{
30+
val searchByName = searchWithType(index) query must(
31+
matchQuery("name", id)
32+
)
33+
log.info(s"Query {}",client.show(searchByName))
34+
def queryResponse = client.execute {
2935
log.info(s"Got retrieve request for $id.")
30-
searchWithType(index) query must (
31-
matchQuery("name", s"http://repo1.maven.org/maven2/:$id")
32-
)
36+
searchByName
3337
}.await
3438

3539
val source = queryResponse match {
36-
case results: RequestSuccess[_] => results.body.get
40+
case results: RequestSuccess[_] => {
41+
val resObj = results.body.get.parseJson.asJsObject
42+
val hitsObj=resObj.fields.getOrElse("hits", JsObject.empty).asJsObject
43+
val hitsArr=hitsObj.fields.getOrElse("hits",JsArray.empty).asInstanceOf[JsArray]
44+
val source=hitsArr.elements.map(m=>m.asJsObject.fields.get("_source"))
45+
source.head.getOrElse(JsObject.empty).toString()
46+
}
3747
case failure: RequestFailure => Option.empty
3848
}
3949
sender().tell(source, context.self)
4050
}
4151
}
4252

43-
object ElasticActor{
44-
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
53+
object ElasticActor {
54+
def props(configuration: Configuration, index: IndexAndType): Props = Props(new ElasticActor(configuration, index))
4555
.withMailbox("es-priority-mailbox")
4656
}

src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ object Server extends HttpApp with JsonSupport with AppLogging {
2525
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
2626
implicit val materializer = ActorMaterializer()
2727

28-
2928
override def routes =
30-
path("version") { version } ~
31-
path("features") { features } ~
32-
pathPrefix("search" / Remaining) { query => search(query) } ~
33-
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
34-
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
29+
path("version") {
30+
version
31+
} ~
32+
path("features") {
33+
features
34+
} ~
35+
pathPrefix("search" / Remaining) { query => search(query) } ~
36+
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
37+
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
3538

3639

3740
private def version = {
@@ -52,11 +55,11 @@ object Server extends HttpApp with JsonSupport with AppLogging {
5255

5356
def retrieve(identifier: String) = {
5457
get {
55-
pass { //TODO: Require authentication here
58+
pass { //TODO: Require authentication here
5659
complete(
5760
(actorManager ? Retrieve(identifier)).mapTo[String]
5861
)
59-
} ~ extractClientIP{ ip =>
62+
} ~ extractClientIP { ip =>
6063
complete(
6164
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
6265
)
@@ -66,7 +69,7 @@ object Server extends HttpApp with JsonSupport with AppLogging {
6669

6770
def enqueue(identifier: String) = {
6871
get {
69-
pass { //TODO: Require authorization here
72+
pass { //TODO: Require authorization here
7073
complete(
7174
(actorManager ? Enqueue(identifier)).mapTo[String]
7275
)

0 commit comments

Comments
 (0)