From 778090d1049c0e2d3f1f033be76eacf2ffb7e36c Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Tue, 14 Aug 2018 21:19:05 +0200 Subject: [PATCH 1/5] Extracting source object from es response --- .../cs/swt/delphi/webapi/ElasticActor.scala | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index c4d4dcd..f94e2a3 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -6,16 +6,19 @@ import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSucces import com.sksamuel.elastic4s.http.ElasticDsl._ import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import spray.json._ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{ +class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging { implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher") val client = ElasticClient(configuration.elasticsearchClientUri) override def preStart(): Unit = log.info("Search actor started") + override def postStop(): Unit = log.info("Search actor shut down") + context.setReceiveTimeout(2 seconds) override def receive = { @@ -25,22 +28,31 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac private def getSource(id: String) = { log.info("Executing get on entry {}", id) - def queryResponse = client.execute{ + val searchByName = searchWithType(index) query must( + // matchQuery("name", s"http://repo1.maven.org/maven2/:$id") + matchQuery("name", id) + ) + log.info(s"Query {}",client.show(searchByName)) + def queryResponse = client.execute { log.info(s"Got retrieve request for $id.") - searchWithType(index) query must ( - matchQuery("name", s"http://repo1.maven.org/maven2/:$id") - ) + searchByName }.await val source = queryResponse match { - case results: RequestSuccess[_] => results.body.get + case results: RequestSuccess[_] => { + val resObj = results.body.get.parseJson.asJsObject + val hitsObj=resObj.fields.getOrElse("hits", JsObject.empty).asJsObject + val hitsArr=hitsObj.fields.getOrElse("hits",JsArray.empty).asInstanceOf[JsArray] + val source=hitsArr.elements.map(m=>m.asJsObject.fields.get("_source")) + source.head.getOrElse(JsObject.empty).toString() + } case failure: RequestFailure => Option.empty } sender().tell(source, context.self) } } -object ElasticActor{ - def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index)) +object ElasticActor { + def props(configuration: Configuration, index: IndexAndType): Props = Props(new ElasticActor(configuration, index)) .withMailbox("es-priority-mailbox") } From 388ddafb02c04c94b69eb506332af2ab0c381e03 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Thu, 4 Oct 2018 20:52:30 +0200 Subject: [PATCH 2/5] Added elastic actor test --- build.sbt | 6 +- .../cs/swt/delphi/webapi/ElasticActor.scala | 2 - .../swt/delphi/webapi/ElasticActorTest.scala | 113 ++++++++++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala diff --git a/build.sbt b/build.sbt index c9ab5ec..9bd713c 100644 --- a/build.sbt +++ b/build.sbt @@ -6,12 +6,14 @@ scalaVersion := "2.12.4" libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.11" +libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % "10.0.11" libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12" libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1" -libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" +libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test" +libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1" val elastic4sVersion = "6.3.0" libraryDependencies ++= Seq( @@ -33,7 +35,7 @@ lazy val webapi = (project in file(".")). enablePlugins(JavaAppPackaging). enablePlugins(DockerPlugin). enablePlugins(ScalastylePlugin). - settings ( + settings( dockerBaseImage := "openjdk:jre-alpine" ). enablePlugins(AshScriptPlugin). diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index f94e2a3..2542d81 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -1,5 +1,4 @@ package de.upb.cs.swt.delphi.webapi - import akka.actor.{Actor, ActorLogging, Props} import com.sksamuel.elastic4s.IndexAndType import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} @@ -29,7 +28,6 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac private def getSource(id: String) = { log.info("Executing get on entry {}", id) val searchByName = searchWithType(index) query must( - // matchQuery("name", s"http://repo1.maven.org/maven2/:$id") matchQuery("name", id) ) log.info(s"Query {}",client.show(searchByName)) diff --git a/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala b/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala new file mode 100644 index 0000000..d85e320 --- /dev/null +++ b/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala @@ -0,0 +1,113 @@ + +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import com.sksamuel.elastic4s.RefreshPolicy +import com.sksamuel.elastic4s.embedded.LocalNode +import com.sksamuel.elastic4s.http.ElasticDsl._ +import org.elasticsearch.common.settings.Settings +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +/** + * @author Hariharan. + */ +class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer()(system) + implicit val executionContext = system.dispatcher + val node = LocalNode("elasticsearch","local-path") + val client = node.client(shutdownNodeOnClose = true) + + override def beforeAll(): Unit = { + client.execute { + createIndex("delphi").mappings( + mapping("project").fields( + keywordField("name"), + keywordField("source"), + keywordField("identifier.groupId"), + keywordField("identifier.artifactId"), + keywordField("identifier.version") + ) + ) + }.await + + client.execute { + indexInto("delphi" / "project").fields( + "name" -> "yom:yom:1.0-alpha-2" + ).refresh(RefreshPolicy.IMMEDIATE) + }.await + } + + override def afterAll(): Unit = { + super.afterAll() + client.close() + system.terminate(); + } + + + + "Version no.." should + "match version from build.sbt" in { + val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/version")) + res.onComplete { + case Success(ver) => { + assert(ver.status.isSuccess()); + val res2Str: Future[String] = Unmarshal(ver.entity).to[String] + res2Str.onComplete { + case Success(value) => { + assert(value.equals(BuildInfo.version)) + } + case Failure(e) => { + assertThrows(e); + } + } + } + case Failure(e) => { + assertThrows(e); + } + } + Await.result(res, 5.seconds) + } + + "Retrive endpoint" should + "get yom:yom:1.0-alpha-2 artifact" in { + val mavenId = "yom:yom:1.0-alpha-2" + val url = s"http://localhost:8080/retrieve/${mavenId}" + val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url)) + res.onComplete { + case Success(data) => { + assert(data.status.isSuccess()) + val res2Str: Future[String] = Unmarshal(data.entity).to[String] + println(res2Str) + } + case Failure(exception) => { + assertThrows(exception) + } + } + } + +} From 55b14a808c034173e2a8d5ade67ce9179ef2dcc0 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Thu, 4 Oct 2018 22:17:58 +0200 Subject: [PATCH 3/5] Updated elastic actor test with mock data --- build.sbt | 4 +- .../de/upb/cs/swt/delphi/webapi/Server.scala | 22 +++++---- .../swt/delphi/webapi/ElasticActorTest.scala | 46 +++++++++---------- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/build.sbt b/build.sbt index 9bd713c..895db2a 100644 --- a/build.sbt +++ b/build.sbt @@ -19,15 +19,13 @@ val elastic4sVersion = "6.3.0" libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, + // for the http client "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion, // if you want to use reactive streams "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion, - // testing - "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test", - "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion % "test" ) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 02e72ee..3315520 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -22,12 +22,16 @@ object Server extends HttpApp with JsonSupport with AppLogging { private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager)) implicit val timeout = Timeout(5, TimeUnit.SECONDS) - override def routes = - path("version") { version } ~ - path("features") { features } ~ - pathPrefix("search" / Remaining) { query => search(query) } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ - pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } + override def routes = + path("version") { + version + } ~ + path("features") { + features + } ~ + pathPrefix("search" / Remaining) { query => search(query) } ~ + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ + pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } private def version = { @@ -48,11 +52,11 @@ object Server extends HttpApp with JsonSupport with AppLogging { def retrieve(identifier: String) = { get { - pass { //TODO: Require authentication here + pass { //TODO: Require authentication here complete( (actorManager ? Retrieve(identifier)).mapTo[String] ) - } ~ extractClientIP{ ip => + } ~ extractClientIP { ip => complete( (requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String] ) @@ -62,7 +66,7 @@ object Server extends HttpApp with JsonSupport with AppLogging { def enqueue(identifier: String) = { get { - pass { //TODO: Require authorization here + pass { //TODO: Require authorization here complete( (actorManager ? Enqueue(identifier)).mapTo[String] ) diff --git a/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala b/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala index d85e320..e144534 100644 --- a/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala +++ b/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala @@ -23,9 +23,8 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer import com.sksamuel.elastic4s.RefreshPolicy -import com.sksamuel.elastic4s.embedded.LocalNode +import com.sksamuel.elastic4s.http.ElasticClient import com.sksamuel.elastic4s.http.ElasticDsl._ -import org.elasticsearch.common.settings.Settings import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import scala.concurrent.duration._ @@ -39,35 +38,26 @@ class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer()(system) implicit val executionContext = system.dispatcher - val node = LocalNode("elasticsearch","local-path") - val client = node.client(shutdownNodeOnClose = true) + val configuration = new Configuration() + val client = ElasticClient(configuration.elasticsearchClientUri) override def beforeAll(): Unit = { - client.execute { - createIndex("delphi").mappings( - mapping("project").fields( - keywordField("name"), - keywordField("source"), - keywordField("identifier.groupId"), - keywordField("identifier.artifactId"), - keywordField("identifier.version") - ) - ) - }.await - client.execute { indexInto("delphi" / "project").fields( - "name" -> "yom:yom:1.0-alpha-2" + "name" -> "test:elastic-actor-test:1.0" ).refresh(RefreshPolicy.IMMEDIATE) }.await } + override def afterAll(): Unit = { - super.afterAll() + client.execute { + deleteByQuery("delphi", "project", matchQuery("name", "test:elastic-actor-test:1.0")) + }.await client.close() - system.terminate(); - } + system.terminate() + } "Version no.." should @@ -90,24 +80,32 @@ class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll { assertThrows(e); } } - Await.result(res, 5.seconds) + Await.result(res, 2.seconds) } "Retrive endpoint" should - "get yom:yom:1.0-alpha-2 artifact" in { - val mavenId = "yom:yom:1.0-alpha-2" + "get test:elastic-actor-test:1.0 artifact" in { + val mavenId = "test:elastic-actor-test:1.0" val url = s"http://localhost:8080/retrieve/${mavenId}" val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url)) res.onComplete { case Success(data) => { assert(data.status.isSuccess()) val res2Str: Future[String] = Unmarshal(data.entity).to[String] - println(res2Str) + res2Str.onComplete { + case Success(value) => { + assert(value.contains(mavenId)) + } + case Failure(e) => { + assertThrows(e); + } + } } case Failure(exception) => { assertThrows(exception) } } + Await.result(res, 2.seconds) } } From 4743facda5b9f01e33426d4f0da2b2991b6863b9 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Fri, 5 Oct 2018 02:47:34 +0200 Subject: [PATCH 4/5] Moved ElasticActorTest to integration test --- build.sbt | 7 ++++++- .../de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala | 0 2 files changed, 6 insertions(+), 1 deletion(-) rename src/{test => it}/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala (100%) diff --git a/build.sbt b/build.sbt index 836107a..fb8b2fb 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,7 @@ libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12" libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1" libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" -libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test" +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test" libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1" val elastic4sVersion = "6.3.0" @@ -32,6 +32,11 @@ libraryDependencies ++= Seq( lazy val webapi = (project in file(".")). + //https://www.scala-sbt.org/1.x/docs/Testing.html +configs(IntegrationTest). + settings( + Defaults.itSettings, + ). enablePlugins(JavaAppPackaging). enablePlugins(DockerPlugin). enablePlugins(ScalastylePlugin). diff --git a/src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala similarity index 100% rename from src/test/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala rename to src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala From f0fa33ef59dcf4669029e150cde46b77919bc96d Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Fri, 5 Oct 2018 21:04:03 +0200 Subject: [PATCH 5/5] Reverting to latest akka http version --- build.sbt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index fb8b2fb..2fbc8c4 100644 --- a/build.sbt +++ b/build.sbt @@ -8,10 +8,10 @@ val akkaHttpVersion = "10.1.5" libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" -libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.11" -libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % "10.0.11" +libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion +libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12" -libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1" +libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test"