Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package de.upb.cs.swt.delphi.instancemanagement

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat}

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {
def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString)

def read(value: JsValue) = value match {
case JsString(s) => s match {
case "Crawler" => InstanceEnums.ComponentType.Crawler
case "WebApi" => InstanceEnums.ComponentType.WebApi
case "WebApp" => InstanceEnums.ComponentType.WebApp
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
}
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
}
}
implicit val instanceFormat = jsonFormat5(Instance)
}

final case class Instance (
id: Option[Long],
host: String,
portNumber: Int,
name: String,
componentType: InstanceEnums.ComponentType
)

object InstanceEnums {

type ComponentType = ComponentType.Value
object ComponentType extends Enumeration {
val Crawler = Value("Crawler")
val WebApi = Value("WebApi")
val WebApp = Value("WebApp")
val DelphiManagement = Value("DelphiManagement")
val ElasticSearch = Value("ElasticSearch")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package de.upb.cs.swt.delphi.instancemanagement

import java.net.InetAddress

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object InstanceRegistry extends JsonSupport with AppLogging
{

implicit val system : ActorSystem = Server.system
implicit val ec : ExecutionContext = system.dispatcher
implicit val materializer : ActorMaterializer = Server.materializer


def register(configuration: Configuration) : Try[Long] = {
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)

Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
if(response.status == StatusCodes.OK){
Await.result(Unmarshal(response.entity).to[String] map { assignedID =>
val id = assignedID.toLong
log.info(s"Successfully registered at Instance Registry, got ID $id.")
Success(id)
} recover { case ex =>
log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
else {
val statuscode = response.status
log.warning(s"Failed to register at Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to register at Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}

def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = {
if(!configuration.usingInstanceRegistry) {
Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available."))
} else {
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")

Await.result(Http(system).singleRequest(request) map {response =>
val status = response.status
if(status == StatusCodes.OK) {

Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
val elasticIP = instance.host
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ")
Success(instance)
} recover {case ex =>
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
else{
log.warning(s"Failed to read response from Instance Registry, server returned $status")
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
}
} recover { case ex =>
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
Failure(ex)
}, Duration.Inf)
}
}

def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = {

if(!configuration.usingInstanceRegistry) {
Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available."))
} else {
if(configuration.elasticsearchInstance.id.isEmpty) {
Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID."))
} else {
val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L)
val request = HttpRequest(
method = HttpMethods.POST,
configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable")

Await.result(Http(system).singleRequest(request) map {response =>
val status=response.status
if(response.status == StatusCodes.OK){
log.info(s"Successfully posted matching result to Instance Registry.")
Success()
}
else {
val statuscode = response.status
log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to post matching result to Instance Registry, exception: $ex")
Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex"))
}, Duration.Inf)
}
}

}

def deregister(configuration: Configuration) : Try[Unit] = {
if(!configuration.usingInstanceRegistry){
Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available."))
} else {
val id : Long = configuration.assignedID.getOrElse(-1L)

val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id")

Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
log.info("Successfully deregistered from Instance Registry.")
Success()
}
else {
val statuscode = response.status
log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode")
Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode"))
}

} recover {case ex =>
log.warning(s"Failed to deregister to Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
}

def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
Marshal(instance).to[RequestEntity] flatMap { entity =>
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
Http(system).singleRequest(request)
}


private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi)
}
57 changes: 54 additions & 3 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,71 @@
package de.upb.cs.swt.delphi.webapi

import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceRegistry}

import scala.util.{Failure, Success, Try}

/**
* @author Ben Hermann
*/
class Configuration( //Server and Elasticsearch configuration
val bindHost: String = "0.0.0.0",
val bindPort: Int = 8080,
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")),
val esProjectIndex: IndexAndType = "delphi" / "project",

//Actor system configuration
val elasticActorPoolSize: Int = 8
) {


lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
elasticsearchInstance.host + ":" + elasticsearchInstance.portNumber)

lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match {
case Success(instance) => instance
case Failure(_) => Instance(
None,
fallbackElasticSearchHost,
fallbackElasticSearchPort,
"Default ElasticSearch instance",
ComponentType.ElasticSearch)
}
val defaultElasticSearchPort : Int = 9200
val defaultElasticSearchHost : String = "elasticsearch://localhost"
val instanceName = "MyWebApiInstance"
val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8087")
lazy val usingInstanceRegistry : Boolean = assignedID match {
case Some(_) => true
case None => false
}
lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match {
case Success(id) => Some(id)
case Failure(_) => None
}
lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match {
case Some(hostString) => if(hostString.count(c => c == ':') == 3){
Try(hostString.split(":")(2).toInt) match {
case Success(port) => port
case Failure(_) => defaultElasticSearchPort
}
} else {
defaultElasticSearchPort
}
case None => defaultElasticSearchPort
}

lazy val fallbackElasticSearchHost : String = sys.env.get("DELPHI_ELASTIC_URI") match {
case Some(hostString) =>
if(hostString.count(c => c == ':') == 2){
hostString.substring(0,hostString.lastIndexOf(":"))
} else {
defaultElasticSearchHost
}
case None => defaultElasticSearchHost

}
}


9 changes: 7 additions & 2 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.http.scaladsl.server.HttpApp
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate
import spray.json._
Expand All @@ -17,10 +19,11 @@ import spray.json._
object Server extends HttpApp with JsonSupport with AppLogging {

private val configuration = new Configuration()
private val system = ActorSystem("delphi-webapi")
implicit val system = ActorSystem("delphi-webapi")
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
implicit val materializer = ActorMaterializer()

override def routes =
path("version") {
Expand Down Expand Up @@ -83,12 +86,14 @@ object Server extends HttpApp with JsonSupport with AppLogging {
}

def main(args: Array[String]): Unit = {
val configuration = new Configuration()
StartupCheck.check(configuration)
Server.startServer(configuration.bindHost, configuration.bindPort)
InstanceRegistry.deregister(configuration)
system.terminate()
}



}


33 changes: 33 additions & 0 deletions src/main/scala/de/upb/cs/swt/delphi/webapi/StartupCheck.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package de.upb.cs.swt.delphi.webapi

import akka.actor.ActorSystem
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl._
import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success, Try}

object StartupCheck extends AppLogging {
def check(configuration: Configuration)(implicit system: ActorSystem): Try[Configuration] = {
log.warning("Performing Instance Registry checks")
implicit val ec : ExecutionContext = system.dispatcher
lazy val client = ElasticClient(configuration.elasticsearchClientUri)

val f = (client.execute {
nodeInfo()
} map { i => {
InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration)
Success(configuration)
}
} recover { case e =>
InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration)
Failure(e)

}).andThen {
case _ => client.close()
}

Await.result(f, Duration.Inf)
}
}