Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions delphi-webapi
Submodule delphi-webapi added at ed901b
Original file line number Diff line number Diff line change
@@ -1,44 +1,85 @@
// Copyright (C) 2018 The Delphi Team.
// See the LICENCE file distributed with this work for additional
// information regarding copyright ownership.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package de.upb.cs.swt.delphi.instancemanagement

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat}
import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat}

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {

implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] {

def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString)

def read(value: JsValue) = value match {
def read(value: JsValue) : InstanceEnums.ComponentType = value match {
case JsString(s) => s match {
case "Crawler" => InstanceEnums.ComponentType.Crawler
case "WebApi" => InstanceEnums.ComponentType.WebApi
case "WebApp" => InstanceEnums.ComponentType.WebApp
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
case x => throw DeserializationException(s"Unexpected string value $x for component type.")
}
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.")
}
}
implicit val instanceFormat = jsonFormat5(Instance)

implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] {

def write(compType : InstanceEnums.State) = JsString(compType.toString)

def read(value: JsValue) : InstanceEnums.State = value match {
case JsString(s) => s match {
case "Running" => InstanceEnums.InstanceState.Running
case "Stopped" => InstanceEnums.InstanceState.Stopped
case "Failed" => InstanceEnums.InstanceState.Failed
case "Paused" => InstanceEnums.InstanceState.Paused
case "NotReachable" => InstanceEnums.InstanceState.NotReachable
case x => throw DeserializationException(s"Unexpected string value $x for instance state.")
}
case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.")
}
}

implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance)
}

final case class Instance (
id: Option[Long],
host: String,
portNumber: Int,
portNumber: Long,
name: String,
componentType: InstanceEnums.ComponentType
componentType: InstanceEnums.ComponentType,
dockerId: Option[String],
instanceState: InstanceEnums.State
)

object InstanceEnums {

type ComponentType = ComponentType.Value
object ComponentType extends Enumeration {
val Crawler = Value("Crawler")
val WebApi = Value("WebApi")
val WebApp = Value("WebApp")
val DelphiManagement = Value("DelphiManagement")
val ElasticSearch = Value("ElasticSearch")
val Crawler : Value = Value("Crawler")
val ElasticSearch : Value = Value("ElasticSearch")
val WebApi : Value = Value("WebApi")
val WebApp : Value = Value("WebApp")
val DelphiManagement : Value = Value("DelphiManagement")
}
type State = InstanceState.Value
object InstanceState extends Enumeration {
val Running : Value = Value("Running")
val Paused : Value = Value("Paused")
val Stopped : Value = Value("Stopped")
val Failed : Value = Value("Failed")
val NotReachable : Value = Value("NotReachable")
}

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
// Copyright (C) 2018 The Delphi Team.
// See the LICENCE file distributed with this work for additional
// information regarding copyright ownership.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package de.upb.cs.swt.delphi.instancemanagement

import java.net.InetAddress

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
import akka.util.ByteString
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState}
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import spray.json._

object InstanceRegistry extends JsonSupport with AppLogging
{
Expand All @@ -22,8 +39,85 @@ object InstanceRegistry extends JsonSupport with AppLogging
implicit val ec : ExecutionContext = system.dispatcher
implicit val materializer : ActorMaterializer = Server.materializer

lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption


def handleInstanceStart(configuration: Configuration) : Option[Long] = {
instanceIdFromEnv match {
case Some(id) =>
reportStart(configuration) match {
case Success(_) => Some(id)
case Failure(_) => None
}
case None =>
register(configuration) match {
case Success(id) => Some(id)
case Failure(_) => None
}
}
}

def register(configuration: Configuration) : Try[Long] = {
def handleInstanceStop(configuration: Configuration) : Try[Unit] = {
if(instanceIdFromEnv.isDefined) {
reportStop(configuration)
} else {
deregister(configuration)
}
}

def handleInstanceFailure(configuration: Configuration) : Try[Unit] = {
if(instanceIdFromEnv.isDefined) {
reportFailure(configuration)
} else {
deregister(configuration)
}
}

def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start)

def reportStop(configuration: Configuration) : Try[Unit] = {
if(configuration.usingInstanceRegistry) {
executeReportOperation(configuration, ReportOperationType.Stop)
} else {
Failure(new RuntimeException("Cannot report stop, no instance registry available."))
}
}

def reportFailure(configuration: Configuration) : Try[Unit] = {
if(configuration.usingInstanceRegistry){
executeReportOperation(configuration, ReportOperationType.Failure)
} else {
Failure(new RuntimeException("Cannot report failure, no instance registry available."))
}
}

private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = {
instanceIdFromEnv match {
case Some(id) =>
val request = HttpRequest(
method = HttpMethods.POST,
configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id))

Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
log.info(s"Successfully reported ${operationType.toString} to Instance Registry.")
Success()
}
else {
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}"))
}

} recover {case ex =>
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex"))
}, Duration.Inf)
case None =>
log.warning(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")
Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'."))
}
}
def register(configuration: Configuration) :Try[Long] = {
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)

Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
Expand Down Expand Up @@ -56,21 +150,26 @@ object InstanceRegistry extends JsonSupport with AppLogging
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")

Await.result(Http(system).singleRequest(request) map {response =>
val status = response.status
if(status == StatusCodes.OK) {

Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
val elasticIP = instance.host
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ")
Success(instance)
} recover {case ex =>
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
Failure(ex)
}, Duration.Inf)
}
else{
log.warning(s"Failed to read response from Instance Registry, server returned $status")
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
response.status match {
case StatusCodes.OK =>
try {
val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds)
val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat)
val elasticIP = esInstance.host
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP")
Success(esInstance)
} catch {
case px: spray.json.JsonParser.ParsingException =>
log.warning(s"Failed to read response from Instance Registry, exception: $px")
Failure(px)
}
case StatusCodes.NotFound =>
log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.")
Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}"))
case _ =>
val status = response.status
log.warning(s"Failed to read matching instance from Instance Registry, server returned $status")
Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status"))
}
} recover { case ex =>
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
Expand Down Expand Up @@ -139,13 +238,89 @@ object InstanceRegistry extends JsonSupport with AppLogging
}
}

def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
Marshal(instance).to[RequestEntity] flatMap { entity =>
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = {
try {
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString())
Http(system).singleRequest(request)
} catch {
case dx: DeserializationException =>
log.warning(s"Failed to deregister to Instance Registry, exception: $dx")
Future.failed(dx)
}
}


private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi)
Instance(id, InetAddress.getLocalHost.getHostAddress,
controlPort, name, ComponentType.WebApi, None, InstanceState.Running)

def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart")
Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
Success(response.entity)
}
else {
val statuscode = response.status
log.warning(s"Failed to perform reportStart, server returned $statuscode")
Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode"))
}
} recover {case ex =>
log.warning(s"Failed to perform reportStart, exception: $ex")
Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex"))
}, Duration.Inf)
}

def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = {

val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure")
Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
Success(response.entity)
}
else {
val statuscode = response.status
log.warning(s"Failed to perform reportFailure, server returned $statuscode")
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode"))
}
} recover {case ex =>
log.warning(s"Failed to perform reportFailure, server returned, exception: $ex")
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex"))
}, Duration.Inf)
}

def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = {

val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop")
Await.result(Http(system).singleRequest(request) map {response =>
if(response.status == StatusCodes.OK){
Success(response.entity)
}
else {
val statuscode = response.status
log.warning(s"Failed to perform reportStop, server returned $statuscode")
Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode"))
}
} recover {case ex =>
log.warning(s"Failed to perform reportStop, server returned, exception: $ex")
Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex"))
}, Duration.Inf)
}

object ReportOperationType extends Enumeration {
val Start : Value = Value("Start")
val Stop : Value = Value("Stop")
val Failure : Value = Value("Failure")

def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = {
operation match {
case Start =>
s"/reportStart?Id=$id"
case Stop =>
s"/reportStop?Id=$id"
case _ =>
s"/reportFailure?Id=$id"
}
}
}
}
Loading