From d47316de0f13721fc783478f1ad1fdd76398fc16 Mon Sep 17 00:00:00 2001 From: ayybeeshafi Date: Tue, 16 Oct 2018 19:38:03 +0200 Subject: [PATCH 1/2] Implementation of functions with respect to the new API. Other funtions are working fine, unable to test reportstop() and resportstart() from inside the container by exporting enviournment variable -INSTANCE_ID- refs #23 --- delphi-webapi | 1 + .../delphi/instancemanagement/Instance.scala | 76 ++++-- .../instancemanagement/InstanceRegistry.scala | 223 ++++++++++++++++-- .../cs/swt/delphi/webapi/Configuration.scala | 26 +- 4 files changed, 280 insertions(+), 46 deletions(-) create mode 160000 delphi-webapi diff --git a/delphi-webapi b/delphi-webapi new file mode 160000 index 0000000..ed901bf --- /dev/null +++ b/delphi-webapi @@ -0,0 +1 @@ +Subproject commit ed901bf01811ac64126906ab10340da0f87fe0e8 diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala index 1e51a03..de9c694 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala @@ -1,45 +1,85 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.instancemanagement - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat} +import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { - implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] { + + implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] { + def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString) - def read(value: JsValue) = value match { + def read(value: JsValue) : InstanceEnums.ComponentType = value match { case JsString(s) => s match { case "Crawler" => InstanceEnums.ComponentType.Crawler case "WebApi" => InstanceEnums.ComponentType.WebApi case "WebApp" => InstanceEnums.ComponentType.WebApp case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch - case x => throw new RuntimeException(s"Unexpected string value $x for component type.") + case x => throw DeserializationException(s"Unexpected string value $x for component type.") } - case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.") + case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.") } } - implicit val instanceFormat = jsonFormat5(Instance) + + implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] { + + def write(compType : InstanceEnums.State) = JsString(compType.toString) + + def read(value: JsValue) : InstanceEnums.State = value match { + case JsString(s) => s match { + case "Running" => InstanceEnums.InstanceState.Running + case "Stopped" => InstanceEnums.InstanceState.Stopped + case "Failed" => InstanceEnums.InstanceState.Failed + case "Paused" => InstanceEnums.InstanceState.Paused + case "NotReachable" => InstanceEnums.InstanceState.NotReachable + case x => throw DeserializationException(s"Unexpected string value $x for instance state.") + } + case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.") + } + } + + implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance) } final case class Instance ( id: Option[Long], host: String, - portNumber: Int, + portNumber: Long, name: String, - /* Component Type */ - componentType: InstanceEnums.ComponentType + componentType: InstanceEnums.ComponentType, + dockerId: Option[String], + instanceState: InstanceEnums.State ) - object InstanceEnums { - type ComponentType = ComponentType.Value object ComponentType extends Enumeration { - val Crawler = Value("Crawler") - val WebApi = Value("WebApi") - val WebApp = Value("WebApp") - val DelphiManagement = Value("DelphiManagement") - val ElasticSearch = Value("ElasticSearch") + val Crawler : Value = Value("Crawler") + val ElasticSearch : Value = Value("ElasticSearch") + val WebApi : Value = Value("WebApi") + val WebApp : Value = Value("WebApp") + val DelphiManagement : Value = Value("DelphiManagement") + } + type State = InstanceState.Value + object InstanceState extends Enumeration { + val Running : Value = Value("Running") + val Paused : Value = Value("Paused") + val Stopped : Value = Value("Stopped") + val Failed : Value = Value("Failed") + val NotReachable : Value = Value("NotReachable") } - } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index fdbd6c2..f01d42d 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -1,19 +1,36 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package de.upb.cs.swt.delphi.instancemanagement import java.net.InetAddress import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer -import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType +import akka.util.ByteString +import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server} import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} +import spray.json._ object InstanceRegistry extends JsonSupport with AppLogging { @@ -22,8 +39,85 @@ object InstanceRegistry extends JsonSupport with AppLogging implicit val ec : ExecutionContext = system.dispatcher implicit val materializer : ActorMaterializer = Server.materializer + lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption + + + def handleInstanceStart(configuration: Configuration) : Option[Long] = { + instanceIdFromEnv match { + case Some(id) => + reportStart(configuration) match { + case Success(_) => Some(id) + case Failure(_) => None + } + case None => + register(configuration) match { + case Success(id) => Some(id) + case Failure(_) => None + } + } + } - def register(configuration: Configuration) : Try[Long] = { + def handleInstanceStop(configuration: Configuration) : Try[Unit] = { + if(instanceIdFromEnv.isDefined) { + reportStop(configuration) + } else { + deregister(configuration) + } + } + + def handleInstanceFailure(configuration: Configuration) : Try[Unit] = { + if(instanceIdFromEnv.isDefined) { + reportFailure(configuration) + } else { + deregister(configuration) + } + } + + def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) + + def reportStop(configuration: Configuration) : Try[Unit] = { + if(configuration.usingInstanceRegistry) { + executeReportOperation(configuration, ReportOperationType.Stop) + } else { + Failure(new RuntimeException("Cannot report stop, no instance registry available.")) + } + } + + def reportFailure(configuration: Configuration) : Try[Unit] = { + if(configuration.usingInstanceRegistry){ + executeReportOperation(configuration, ReportOperationType.Failure) + } else { + Failure(new RuntimeException("Cannot report failure, no instance registry available.")) + } + } + + private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = { + instanceIdFromEnv match { + case Some(id) => + val request = HttpRequest( + method = HttpMethods.POST, + configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) + + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") + Success() + } + else { + log.warning(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}") + Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")) + } + + } recover {case ex => + log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex") + Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")) + }, Duration.Inf) + case None => + log.warning(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.") + Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")) + } + } + def register(configuration: Configuration) :Try[Long] = { val instance = createInstance(None,configuration.bindPort, configuration.instanceName) Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => @@ -56,21 +150,26 @@ object InstanceRegistry extends JsonSupport with AppLogging val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") Await.result(Http(system).singleRequest(request) map {response => - val status = response.status - if(status == StatusCodes.OK) { - - Await.result(Unmarshal(response.entity).to[Instance] map {instance => - val elasticIP = instance.host - log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ") - Success(instance) - } recover {case ex => - log.warning(s"Failed to read response from Instance Registry, exception: $ex") - Failure(ex) - }, Duration.Inf) - } - else{ - log.warning(s"Failed to read response from Instance Registry, server returned $status") - Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status")) + response.status match { + case StatusCodes.OK => + try { + val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) + val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat) + val elasticIP = esInstance.host + log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") + Success(esInstance) + } catch { + case px: spray.json.JsonParser.ParsingException => + log.warning(s"Failed to read response from Instance Registry, exception: $px") + Failure(px) + } + case StatusCodes.NotFound => + log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.") + Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}")) + case _ => + val status = response.status + log.warning(s"Failed to read matching instance from Instance Registry, server returned $status") + Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status")) } } recover { case ex => log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ") @@ -139,13 +238,89 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = - Marshal(instance).to[RequestEntity] flatMap { entity => - val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) + def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = { + try { + val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) Http(system).singleRequest(request) + } catch { + case dx: DeserializationException => + log.warning(s"Failed to deregister to Instance Registry, exception: $dx") + Future.failed(dx) } + } private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = - Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi) -} + Instance(id, InetAddress.getLocalHost.getHostAddress, + controlPort, name, ComponentType.WebApi, None, InstanceState.Running) + + def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={ + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + Success(response.entity) + } + else { + val statuscode = response.status + log.warning(s"Failed to perform reportStart, server returned $statuscode") + Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode")) + } + } recover {case ex => + log.warning(s"Failed to perform reportStart, exception: $ex") + Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex")) + }, Duration.Inf) + } + + def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = { + + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure") + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + Success(response.entity) + } + else { + val statuscode = response.status + log.warning(s"Failed to perform reportFailure, server returned $statuscode") + Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode")) + } + } recover {case ex => + log.warning(s"Failed to perform reportFailure, server returned, exception: $ex") + Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex")) + }, Duration.Inf) + } + + def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = { + + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop") + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + Success(response.entity) + } + else { + val statuscode = response.status + log.warning(s"Failed to perform reportStop, server returned $statuscode") + Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode")) + } + } recover {case ex => + log.warning(s"Failed to perform reportStop, server returned, exception: $ex") + Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex")) + }, Duration.Inf) + } + + object ReportOperationType extends Enumeration { + val Start : Value = Value("Start") + val Stop : Value = Value("Stop") + val Failure : Value = Value("Failure") + + def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = { + operation match { + case Start => + s"/reportStart?Id=$id" + case Stop => + s"/reportStop?Id=$id" + case _ => + s"/reportFailure?Id=$id" + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 741a312..8081d76 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -1,8 +1,23 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.webapi import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} -import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType +import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceRegistry} import scala.util.{Failure, Success, Try} @@ -23,14 +38,16 @@ class Configuration( //Server and Elasticsearch configuration lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( elasticsearchInstance.host + ":" + elasticsearchInstance.portNumber) - lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { + lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance( configuration = this) match { case Success(instance) => instance case Failure(_) => Instance( None, fallbackElasticSearchHost, fallbackElasticSearchPort, "Default ElasticSearch instance", - ComponentType.ElasticSearch) + ComponentType.ElasticSearch, + None, + InstanceState.Running) } val defaultElasticSearchPort : Int = 9200 val defaultElasticSearchHost : String = "elasticsearch://localhost" @@ -40,7 +57,7 @@ class Configuration( //Server and Elasticsearch configuration case Some(_) => true case None => false } - lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match { + lazy val assignedID : Option[Long] = InstanceRegistry.register(configuration = this) match { case Success(id) => Some(id) case Failure(_) => None } @@ -66,6 +83,7 @@ class Configuration( //Server and Elasticsearch configuration case None => defaultElasticSearchHost } + lazy val instanceId : Option[Long] = InstanceRegistry.handleInstanceStart(configuration = this) } From e4bf21c1b09e809e314baa0589872d76e8e60f9d Mon Sep 17 00:00:00 2001 From: ayybeeshafi Date: Thu, 18 Oct 2018 11:12:34 +0200 Subject: [PATCH 2/2] Implementation and testing of functions with respect to new API refs #23 --- .../scala/de/upb/cs/swt/delphi/webapi/Configuration.scala | 6 ++---- src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala | 8 +++++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 8081d76..e8e51e2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -57,10 +57,8 @@ class Configuration( //Server and Elasticsearch configuration case Some(_) => true case None => false } - lazy val assignedID : Option[Long] = InstanceRegistry.register(configuration = this) match { - case Success(id) => Some(id) - case Failure(_) => None - } + lazy val assignedID : Option[Long] = InstanceRegistry.handleInstanceStart(configuration = this) + lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match { case Some(hostString) => if(hostString.count(c => c == ':') == 3){ Try(hostString.split(":")(2).toInt) match { diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index f7f0e82..6fc9650 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -2,7 +2,9 @@ package de.upb.cs.swt.delphi.webapi import java.util.concurrent.TimeUnit -import akka.actor.ActorSystem +import akka.Done +import akka.actor.{ActorSystem, PoisonPill} +import akka.http.scaladsl.Http import akka.http.scaladsl.server.HttpApp import akka.pattern.ask import akka.stream.ActorMaterializer @@ -17,7 +19,7 @@ import spray.json._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} /** * Web server configuration for Delphi web API. @@ -110,7 +112,7 @@ object Server extends HttpApp with JsonSupport with AppLogging { Await.ready(f, Duration.Inf) Server.startServer(configuration.bindHost, configuration.bindPort) - InstanceRegistry.deregister(configuration) + InstanceRegistry.handleInstanceStop(configuration) system.terminate() }