Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,40 @@ scalaVersion := "2.12.4"
val akkaHttpVersion = "10.1.5"

libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"

libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
libraryDependencies += "io.spray" %% "spray-json" % "1.3.3"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "it,test"
libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.1"

val elastic4sVersion = "6.3.0"
libraryDependencies ++= Seq(
"com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion,


// for the http client
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion,

// if you want to use reactive streams
"com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion,

// testing
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sVersion % "test",
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion % "test"
)


lazy val webapi = (project in file(".")).
//https://www.scala-sbt.org/1.x/docs/Testing.html
configs(IntegrationTest).
settings(
Defaults.itSettings,
).
enablePlugins(JavaAppPackaging).
enablePlugins(DockerPlugin).
enablePlugins(ScalastylePlugin).
settings (
settings(
dockerBaseImage := "openjdk:jre-alpine"
).
enablePlugins(AshScriptPlugin).
Expand Down
111 changes: 111 additions & 0 deletions src/it/scala/de/upb/cs/swt/delphi/webapi/ElasticActorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@

// Copyright (C) 2018 The Delphi Team.
// See the LICENCE file distributed with this work for additional
// information regarding copyright ownership.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package de.upb.cs.swt.delphi.webapi

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.sksamuel.elastic4s.RefreshPolicy
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}

/**
* @author Hariharan.
*/
class ElasticActorTest extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()(system)
implicit val executionContext = system.dispatcher
val configuration = new Configuration()
val client = ElasticClient(configuration.elasticsearchClientUri)

override def beforeAll(): Unit = {
client.execute {
indexInto("delphi" / "project").fields(
"name" -> "test:elastic-actor-test:1.0"
).refresh(RefreshPolicy.IMMEDIATE)
}.await
}


override def afterAll(): Unit = {
client.execute {
deleteByQuery("delphi", "project", matchQuery("name", "test:elastic-actor-test:1.0"))
}.await
client.close()
system.terminate()

}


"Version no.." should
"match version from build.sbt" in {
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/version"))
res.onComplete {
case Success(ver) => {
assert(ver.status.isSuccess());
val res2Str: Future[String] = Unmarshal(ver.entity).to[String]
res2Str.onComplete {
case Success(value) => {
assert(value.equals(BuildInfo.version))
}
case Failure(e) => {
assertThrows(e);
}
}
}
case Failure(e) => {
assertThrows(e);
}
}
Await.result(res, 2.seconds)
}

"Retrive endpoint" should
"get test:elastic-actor-test:1.0 artifact" in {
val mavenId = "test:elastic-actor-test:1.0"
val url = s"http://localhost:8080/retrieve/${mavenId}"
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = url))
res.onComplete {
case Success(data) => {
assert(data.status.isSuccess())
val res2Str: Future[String] = Unmarshal(data.entity).to[String]
res2Str.onComplete {
case Success(value) => {
assert(value.contains(mavenId))
}
case Failure(e) => {
assertThrows(e);
}
}
}
case Failure(exception) => {
assertThrows(exception)
}
}
Await.result(res, 2.seconds)
}

}
28 changes: 19 additions & 9 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.{Actor, ActorLogging, Props}
import com.sksamuel.elastic4s.IndexAndType
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.http.ElasticDsl._
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}

import spray.json._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging {

implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
val client = ElasticClient(configuration.elasticsearchClientUri)

override def preStart(): Unit = log.info("Search actor started")

override def postStop(): Unit = log.info("Search actor shut down")

context.setReceiveTimeout(2 seconds)

override def receive = {
Expand All @@ -25,22 +27,30 @@ class ElasticActor(configuration: Configuration, index: IndexAndType) extends Ac

private def getSource(id: String) = {
log.info("Executing get on entry {}", id)
def queryResponse = client.execute{
val searchByName = searchWithType(index) query must(
matchQuery("name", id)
)
log.info(s"Query {}",client.show(searchByName))
def queryResponse = client.execute {
log.info(s"Got retrieve request for $id.")
searchWithType(index) query must (
matchQuery("name", s"http://repo1.maven.org/maven2/:$id")
)
searchByName
}.await

val source = queryResponse match {
case results: RequestSuccess[_] => results.body.get
case results: RequestSuccess[_] => {
val resObj = results.body.get.parseJson.asJsObject
val hitsObj=resObj.fields.getOrElse("hits", JsObject.empty).asJsObject
val hitsArr=hitsObj.fields.getOrElse("hits",JsArray.empty).asInstanceOf[JsArray]
val source=hitsArr.elements.map(m=>m.asJsObject.fields.get("_source"))
source.head.getOrElse(JsObject.empty).toString()
}
case failure: RequestFailure => Option.empty
}
sender().tell(source, context.self)
}
}

object ElasticActor{
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
object ElasticActor {
def props(configuration: Configuration, index: IndexAndType): Props = Props(new ElasticActor(configuration, index))
.withMailbox("es-priority-mailbox")
}
22 changes: 13 additions & 9 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ object Server extends HttpApp with JsonSupport with AppLogging {
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
implicit val timeout = Timeout(5, TimeUnit.SECONDS)

override def routes =
path("version") { version } ~
path("features") { features } ~
pathPrefix("search" / Remaining) { query => search(query) } ~
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
override def routes =
path("version") {
version
} ~
path("features") {
features
} ~
pathPrefix("search" / Remaining) { query => search(query) } ~
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }


private def version = {
Expand All @@ -48,11 +52,11 @@ object Server extends HttpApp with JsonSupport with AppLogging {

def retrieve(identifier: String) = {
get {
pass { //TODO: Require authentication here
pass { //TODO: Require authentication here
complete(
(actorManager ? Retrieve(identifier)).mapTo[String]
)
} ~ extractClientIP{ ip =>
} ~ extractClientIP { ip =>
complete(
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
)
Expand All @@ -62,7 +66,7 @@ object Server extends HttpApp with JsonSupport with AppLogging {

def enqueue(identifier: String) = {
get {
pass { //TODO: Require authorization here
pass { //TODO: Require authorization here
complete(
(actorManager ? Enqueue(identifier)).mapTo[String]
)
Expand Down