Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package de.upb.cs.swt.delphi.instancemanagement

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat}

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {
def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString())

def read(value: JsValue) = value match {
case JsString(s) => s match {
case "Crawler" => InstanceEnums.ComponentType.Crawler
case "WebApi" => InstanceEnums.ComponentType.WebApi
case "WebApp" => InstanceEnums.ComponentType.WebApp
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
}
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
}
}
implicit val instanceFormat = jsonFormat5(Instance)
}

final case class Instance (
iD: Option[Long],
iP: Option[String],
portnumber: Option[Long],
name: Option[String],
/* Component Type */
componentType: Option[InstanceEnums.ComponentType]
)

object InstanceEnums {

type ComponentType = ComponentType.Value
object ComponentType extends Enumeration {
val Crawler = Value("Crawler")
val WebApi = Value("WebApi")
val WebApp = Value("WebApp")
val DelphiManagement = Value("DelphiManagement")
val ElasticSearch = Value("ElasticSearch")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package de.upb.cs.swt.delphi.instancemanagement

import java.net.InetAddress

import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}


import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object InstanceRegistry extends JsonSupport with AppLogging
{

implicit val system = Server.system
implicit val ec = system.dispatcher
implicit val materializer = Server.materializer


def register(configuration: Configuration) : Try[Long] = {
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)

Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
if(response.status == StatusCodes.OK){
Await.result(Unmarshal(response.entity).to[String] map { assignedID =>
val id = assignedID.toLong
log.info(s"Successfully registered at Instance Registry, got ID $id.")
Success(id)
} recover { case ex =>
log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
else {
val statuscode = response.status
log.warning(s"Failed to register at Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to register at Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}

def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = {
if(!configuration.usingInstanceRegistry) {
Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available."))
} else {
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")

Await.result(Http(system).singleRequest(request) map {response =>
val status = response.status
if(status == StatusCodes.OK) {

Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
val elasticIP = instance.iP
log.info(s"Instance Registry assigned ElasticSearch instance at ${elasticIP.getOrElse("None")}")
Success(instance)
} recover {case ex =>
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
else{
log.warning(s"Failed to read response from Instance Registry, server returned $status")
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
}
} recover { case ex =>
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
Failure(ex)
}, Duration.Inf)
}
}

def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = {
if(!configuration.usingInstanceRegistry) {
Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available."))
} else {
if(configuration.elasticsearchInstance.iD.isEmpty) {
Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID."))
} else {
val IdToPost = configuration.elasticsearchInstance.iD.get
val request = HttpRequest(
method = HttpMethods.POST,
configuration.instanceRegistryUri + s"/matchingResult?Id=$IdToPost&MatchingSuccessful=$isElasticSearchReachable")

Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
log.info("Successfully posted matching result to Instance Registry.")
Success()
}
else {
val statuscode = response.status
log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to post matching result to Instance Registry, exception: $ex")
Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex"))
}, Duration.Inf)
}
}

}

def deregister(configuration: Configuration) : Try[Unit] = {
if(!configuration.usingInstanceRegistry){
Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available."))
} else {
val id : Long = configuration.assignedID.get

val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id")

Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
log.info("Successfully deregistered from Instance Registry.")
Success()
}
else {
val statuscode = response.status
log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to deregister to Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
}

def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
Marshal(instance).to[RequestEntity] flatMap { entity =>
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
Http(system).singleRequest(request)
}


private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
Instance(id, Option(InetAddress.getLocalHost.getHostAddress), Option(controlPort), Option(name), Option(ComponentType.Crawler))
}
31 changes: 29 additions & 2 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,46 @@ package de.upb.cs.swt.delphi.webapi

import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
import com.sksamuel.elastic4s.http.ElasticDsl._
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceRegistry}

import scala.util.{Failure, Success}

/**
* @author Ben Hermann
*/
class Configuration( //Server and Elasticsearch configuration
val bindHost: String = "0.0.0.0",
val bindPort: Int = 8080,
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")),
val esProjectIndex: IndexAndType = "delphi" / "project",

//Actor system configuration
val elasticActorPoolSize: Int = 8
) {


lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri({
if(elasticsearchInstance.portnumber.isEmpty){
elasticsearchInstance.iP.get
}else{
elasticsearchInstance.iP.get + ":" + elasticsearchInstance.portnumber.get
}
})

lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match {
case Success(instance) => instance
case Failure(_) => Instance(None, Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")), None, Some("Default ElasticSearch instance"), Some(ComponentType.ElasticSearch) )
}

val instanceName = "MyWebApiInstance"
val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8085")
lazy val usingInstanceRegistry : Boolean = assignedID match {
case Some(_) => true
case None => false
}
lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match {
case Success(id) => Some(id)
case Failure(_) => None
}

}
36 changes: 33 additions & 3 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,34 @@ import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.http.scaladsl.server.HttpApp
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl._
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate
import spray.json._

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success}

/**
* Web server configuration for Delphi web API.
*/
object Server extends HttpApp with JsonSupport with AppLogging {

private val configuration = new Configuration()
private val system = ActorSystem("delphi-webapi")
implicit val system = ActorSystem("delphi-webapi")
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
implicit val materializer = ActorMaterializer()


override def routes =
override def routes =
path("version") { version } ~
path("features") { features } ~
pathPrefix("search" / Remaining) { query => search(query) } ~
Expand Down Expand Up @@ -79,8 +89,28 @@ object Server extends HttpApp with JsonSupport with AppLogging {
}

def main(args: Array[String]): Unit = {
val configuration = new Configuration()

implicit val ec : ExecutionContext = system.dispatcher
lazy val client = ElasticClient(configuration.elasticsearchClientUri)

val f = (client.execute {
nodeInfo()
} map { i => {
if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(true, configuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the other PR: I would like to move this condition into the method.

Success(configuration)
}
} recover { case e => {
if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(false, configuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Failure(e)
}
}).andThen {
case _ => client.close()
}

Await.ready(f, Duration.Inf)

Server.startServer(configuration.bindHost, configuration.bindPort)
InstanceRegistry.deregister(configuration)
system.terminate()
}

Expand Down