diff --git a/build.sbt b/build.sbt index 9d449eb..2fbc8c4 100644 --- a/build.sbt +++ b/build.sbt @@ -7,35 +7,40 @@ scalaVersion := "2.12.4" val akkaHttpVersion = "10.1.5" libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" + libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion +libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12" libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion -libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" -libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" +libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" -libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test" +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test" +libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1" val elastic4sVersion = "6.3.0" libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, + // for the http client "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion, // if you want to use reactive streams "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion, - // testing - "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test", - "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion % "test" ) lazy val webapi = (project in file(".")). + //https://www.scala-sbt.org/1.x/docs/Testing.html +configs(IntegrationTest). + settings( + Defaults.itSettings, + ). enablePlugins(JavaAppPackaging). enablePlugins(DockerPlugin). enablePlugins(ScalastylePlugin). - settings ( + settings( dockerBaseImage := "openjdk:jre-alpine" ). enablePlugins(AshScriptPlugin). diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala new file mode 100644 index 0000000..e144534 --- /dev/null +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala @@ -0,0 +1,111 @@ + +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import com.sksamuel.elastic4s.RefreshPolicy +import com.sksamuel.elastic4s.http.ElasticClient +import com.sksamuel.elastic4s.http.ElasticDsl._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +/** + * @author Hariharan. + */ +class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer()(system) + implicit val executionContext = system.dispatcher + val configuration = new Configuration() + val client = ElasticClient(configuration.elasticsearchClientUri) + + override def beforeAll(): Unit = { + client.execute { + indexInto("delphi" / "project").fields( + "name" -> "test:elastic-actor-test:1.0" + ).refresh(RefreshPolicy.IMMEDIATE) + }.await + } + + + override def afterAll(): Unit = { + client.execute { + deleteByQuery("delphi", "project", matchQuery("name", "test:elastic-actor-test:1.0")) + }.await + client.close() + system.terminate() + + } + + + "Version no.." should + "match version from build.sbt" in { + val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/version")) + res.onComplete { + case Success(ver) => { + assert(ver.status.isSuccess()); + val res2Str: Future[String] = Unmarshal(ver.entity).to[String] + res2Str.onComplete { + case Success(value) => { + assert(value.equals(BuildInfo.version)) + } + case Failure(e) => { + assertThrows(e); + } + } + } + case Failure(e) => { + assertThrows(e); + } + } + Await.result(res, 2.seconds) + } + + "Retrive endpoint" should + "get test:elastic-actor-test:1.0 artifact" in { + val mavenId = "test:elastic-actor-test:1.0" + val url = s"http://localhost:8080/retrieve/${mavenId}" + val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url)) + res.onComplete { + case Success(data) => { + assert(data.status.isSuccess()) + val res2Str: Future[String] = Unmarshal(data.entity).to[String] + res2Str.onComplete { + case Success(value) => { + assert(value.contains(mavenId)) + } + case Failure(e) => { + assertThrows(e); + } + } + } + case Failure(exception) => { + assertThrows(exception) + } + } + Await.result(res, 2.seconds) + } + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index c4d4dcd..2542d81 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -1,21 +1,23 @@ package de.upb.cs.swt.delphi.webapi - import akka.actor.{Actor, ActorLogging, Props} import com.sksamuel.elastic4s.IndexAndType import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.http.ElasticDsl._ import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import spray.json._ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{ +class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging { implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher") val client = ElasticClient(configuration.elasticsearchClientUri) override def preStart(): Unit = log.info("Search actor started") + override def postStop(): Unit = log.info("Search actor shut down") + context.setReceiveTimeout(2 seconds) override def receive = { @@ -25,22 +27,30 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac private def getSource(id: String) = { log.info("Executing get on entry {}", id) - def queryResponse = client.execute{ + val searchByName = searchWithType(index) query must( + matchQuery("name", id) + ) + log.info(s"Query {}",client.show(searchByName)) + def queryResponse = client.execute { log.info(s"Got retrieve request for $id.") - searchWithType(index) query must ( - matchQuery("name", s"http://repo1.maven.org/maven2/:$id") - ) + searchByName }.await val source = queryResponse match { - case results: RequestSuccess[_] => results.body.get + case results: RequestSuccess[_] => { + val resObj = results.body.get.parseJson.asJsObject + val hitsObj=resObj.fields.getOrElse("hits", JsObject.empty).asJsObject + val hitsArr=hitsObj.fields.getOrElse("hits",JsArray.empty).asInstanceOf[JsArray] + val source=hitsArr.elements.map(m=>m.asJsObject.fields.get("_source")) + source.head.getOrElse(JsObject.empty).toString() + } case failure: RequestFailure => Option.empty } sender().tell(source, context.self) } } -object ElasticActor{ - def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index)) +object ElasticActor { + def props(configuration: Configuration, index: IndexAndType): Props = Props(new ElasticActor(configuration, index)) .withMailbox("es-priority-mailbox") } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 02e72ee..3315520 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -22,12 +22,16 @@ object Server extends HttpApp with JsonSupport with AppLogging { private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager)) implicit val timeout = Timeout(5, TimeUnit.SECONDS) - override def routes = - path("version") { version } ~ - path("features") { features } ~ - pathPrefix("search" / Remaining) { query => search(query) } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ - pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } + override def routes = + path("version") { + version + } ~ + path("features") { + features + } ~ + pathPrefix("search" / Remaining) { query => search(query) } ~ + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ + pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } private def version = { @@ -48,11 +52,11 @@ object Server extends HttpApp with JsonSupport with AppLogging { def retrieve(identifier: String) = { get { - pass { //TODO: Require authentication here + pass { //TODO: Require authentication here complete( (actorManager ? Retrieve(identifier)).mapTo[String] ) - } ~ extractClientIP{ ip => + } ~ extractClientIP { ip => complete( (requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String] ) @@ -62,7 +66,7 @@ object Server extends HttpApp with JsonSupport with AppLogging { def enqueue(identifier: String) = { get { - pass { //TODO: Require authorization here + pass { //TODO: Require authorization here complete( (actorManager ? Enqueue(identifier)).mapTo[String] )