From ac626497407159b7eee8d985d74ebeb04c6c9f8d Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 4 Nov 2018 18:30:49 +0100 Subject: [PATCH 01/16] Adapted instance registry client to reflect the interface change in endpoints related to matching. --- .../instancemanagement/InstanceRegistry.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index f01d42d..65f2fe9 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -147,7 +147,9 @@ object InstanceRegistry extends JsonSupport with AppLogging if(!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) } else { - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") + val request = HttpRequest(method = HttpMethods.GET, + configuration.instanceRegistryUri + + s"/matchingInstance?Id=${configuration.assignedID.getOrElse(-1)}&ComponentType=ElasticSearch") Await.result(Http(system).singleRequest(request) map {response => response.status match { @@ -189,18 +191,17 @@ object InstanceRegistry extends JsonSupport with AppLogging val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) val request = HttpRequest( method = HttpMethods.POST, - configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable") + configuration.instanceRegistryUri + + s"/matchingResult?CallerId=${configuration.assignedID.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable") Await.result(Http(system).singleRequest(request) map {response => - val status=response.status if(response.status == StatusCodes.OK){ log.info(s"Successfully posted matching result to Instance Registry.") Success() } else { - val statuscode = response.status - log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode") - Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode")) + log.warning(s"Failed to post matching result to Instance Registry, server returned ${response.status}") + Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned ${response.status}")) } } recover {case ex => From d2f3ac7e4a565556d80c71d36835f9a49e2fa438 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Mon, 5 Nov 2018 18:06:47 +0100 Subject: [PATCH 02/16] Adapted instance model to reflect latest changes: attribute 'labels' was added. --- .../de/upb/cs/swt/delphi/instancemanagement/Instance.scala | 5 +++-- .../cs/swt/delphi/instancemanagement/InstanceRegistry.scala | 2 +- .../scala/de/upb/cs/swt/delphi/webapi/Configuration.scala | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala index db29ab8..7325b96 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala @@ -54,7 +54,7 @@ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { } } - implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance) + implicit val instanceFormat : JsonFormat[Instance] = jsonFormat8(Instance) } final case class Instance ( @@ -64,7 +64,8 @@ final case class Instance ( name: String, componentType: InstanceEnums.ComponentType, dockerId: Option[String], - instanceState: InstanceEnums.State + instanceState: InstanceEnums.State, + labels: List[String] ) object InstanceEnums { type ComponentType = ComponentType.Value diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 65f2fe9..c908576 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -253,7 +253,7 @@ object InstanceRegistry extends JsonSupport with AppLogging private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = Instance(id, InetAddress.getLocalHost.getHostAddress, - controlPort, name, ComponentType.WebApi, None, InstanceState.Running) + controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String]) def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={ val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 3874191..1cc8b1a 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -49,7 +49,8 @@ class Configuration( //Server and Elasticsearch configuration "Default ElasticSearch instance", ComponentType.ElasticSearch, None, - InstanceState.Running) + InstanceState.Running, + List.empty[String]) } val defaultElasticSearchPort: Int = 9200 val defaultElasticSearchHost: String = "elasticsearch://localhost" From cfac628b526b4e5dba35cb3186dd05138441b2c0 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Tue, 13 Nov 2018 22:25:17 +0100 Subject: [PATCH 03/16] Added api request limit by ip functionality #8 --- .gitignore | 1 + .../swt/delphi/webapi/RequestLimitCheck.scala | 80 +++++++++ .../instancemanagement/InstanceRegistry.scala | 124 +++++++------- .../cs/swt/delphi/webapi/DelphiRoutes.scala | 159 ++++++++++++++++++ .../upb/cs/swt/delphi/webapi/IpLogActor.scala | 63 +++++++ .../de/upb/cs/swt/delphi/webapi/Server.scala | 102 +---------- .../de/upb/cs/swt/delphi/webapi/package.scala | 52 ++++++ 7 files changed, 418 insertions(+), 163 deletions(-) create mode 100644 src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala diff --git a/.gitignore b/.gitignore index e051c58..e21d42e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea project/target target +local-path \ No newline at end of file diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala new file mode 100644 index 0000000..7a8e6b3 --- /dev/null +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala @@ -0,0 +1,80 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import spray.json._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +/** + * @author Hariharan. + */ +class RequestLimitCheck extends WordSpec with Matchers with BeforeAndAfterAll with JsonSupport { + val delphiRoutes = DelphiRoutes() + val serverBinding: Future[Http.ServerBinding] = Http() + .bindAndHandle(delphiRoutes, "localhost", 8085) + + override protected def beforeAll(): Unit = { + serverBinding.onComplete { + case Success(server) => + println(s"Server started at http://${server.localAddress.getHostString}:${server.localAddress.getPort}/") + case Failure(e) => + e.printStackTrace() + sys.exit(0) + } + } + + + "Requests" should { + "throttle when limit reached" in { + def responseFuture: Future[HttpResponse] = Http() + .singleRequest(HttpRequest(uri = "http://localhost:8085/version")) + + //Completing request limit + for (i <- (1 to maxIndividualReq)) { + Await.result(responseFuture, 1.second) + } + case class LimitMsg(msg: String) + implicit val msgFormat = jsonFormat1(LimitMsg) + + val limitReachedFuture = responseFuture + limitReachedFuture.onComplete { + case Success(res) => { + val msgPromise = Unmarshal(res.entity).to[LimitMsg] + msgPromise.onComplete { + case Success(limitMsg) => { + assertResult("Request limit exceeded")(limitMsg.msg) + } + case Failure(exception) => { + fail(exception) + } + } + } + case Failure(exception) => { + fail(exception) + } + } + Await.result(system.terminate(), 5.seconds) + } + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index c908576..9ca7c36 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -18,31 +18,25 @@ package de.upb.cs.swt.delphi.instancemanagement import java.net.InetAddress -import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.ActorMaterializer import akka.util.ByteString import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} -import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server} +import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, _} +import spray.json._ -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} -import spray.json._ -object InstanceRegistry extends JsonSupport with AppLogging -{ +object InstanceRegistry extends JsonSupport with AppLogging { - implicit val system : ActorSystem = Server.system - implicit val ec : ExecutionContext = system.dispatcher - implicit val materializer : ActorMaterializer = Server.materializer - lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption + lazy val instanceIdFromEnv: Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption - def handleInstanceStart(configuration: Configuration) : Option[Long] = { + def handleInstanceStart(configuration: Configuration): Option[Long] = { instanceIdFromEnv match { case Some(id) => reportStart(configuration) match { @@ -57,49 +51,49 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def handleInstanceStop(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceStop(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportStop(configuration) } else { deregister(configuration) } } - def handleInstanceFailure(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceFailure(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportFailure(configuration) } else { deregister(configuration) } } - def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) + def reportStart(configuration: Configuration): Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) - def reportStop(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry) { + def reportStop(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Stop) } else { Failure(new RuntimeException("Cannot report stop, no instance registry available.")) } } - def reportFailure(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry){ + def reportFailure(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Failure) } else { Failure(new RuntimeException("Cannot report failure, no instance registry available.")) } } - private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = { + private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value): Try[Unit] = { instanceIdFromEnv match { case Some(id) => val request = HttpRequest( method = HttpMethods.POST, configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") Success() } @@ -108,7 +102,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")) }, Duration.Inf) @@ -117,11 +111,12 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")) } } - def register(configuration: Configuration) :Try[Long] = { - val instance = createInstance(None,configuration.bindPort, configuration.instanceName) - Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => - if(response.status == StatusCodes.OK){ + def register(configuration: Configuration): Try[Long] = { + val instance = createInstance(None, configuration.bindPort, configuration.instanceName) + + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map { response => + if (response.status == StatusCodes.OK) { Await.result(Unmarshal(response.entity).to[String] map { assignedID => val id = assignedID.toLong log.info(s"Successfully registered at Instance Registry, got ID $id.") @@ -137,25 +132,25 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to register at Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } - def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = { - if(!configuration.usingInstanceRegistry) { + def retrieveElasticSearchInstance(configuration: Configuration): Try[Instance] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) } else { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + s"/matchingInstance?Id=${configuration.assignedID.getOrElse(-1)}&ComponentType=ElasticSearch") - Await.result(Http(system).singleRequest(request) map {response => + Await.result(Http(system).singleRequest(request) map { response => response.status match { case StatusCodes.OK => try { - val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) + val instanceString: String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat) val elasticIP = esInstance.host log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") @@ -180,12 +175,12 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { + def sendMatchingResult(isElasticSearchReachable: Boolean, configuration: Configuration): Try[Unit] = { - if(!configuration.usingInstanceRegistry) { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) } else { - if(configuration.elasticsearchInstance.id.isEmpty) { + if (configuration.elasticsearchInstance.id.isEmpty) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) @@ -194,8 +189,8 @@ object InstanceRegistry extends JsonSupport with AppLogging configuration.instanceRegistryUri + s"/matchingResult?CallerId=${configuration.assignedID.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() } @@ -204,7 +199,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to post matching result to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) }, Duration.Inf) @@ -213,16 +208,16 @@ object InstanceRegistry extends JsonSupport with AppLogging } - def deregister(configuration: Configuration) : Try[Unit] = { - if(!configuration.usingInstanceRegistry){ + def deregister(configuration: Configuration): Try[Unit] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) } else { - val id : Long = configuration.assignedID.getOrElse(-1L) + val id: Long = configuration.assignedID.getOrElse(-1L) val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info("Successfully deregistered from Instance Registry.") Success() } @@ -232,14 +227,14 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to deregister to Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } } - def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = { + def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) Http(system).singleRequest(request) @@ -251,14 +246,14 @@ object InstanceRegistry extends JsonSupport with AppLogging } - private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = + private def createInstance(id: Option[Long], controlPort: Int, name: String): Instance = Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String]) - def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={ + def reportStart(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -266,17 +261,17 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportStart, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportStart, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex")) }, Duration.Inf) } - def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = { + def reportFailure(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -284,17 +279,17 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportFailure, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportFailure, server returned, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex")) }, Duration.Inf) } - def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = { + def reportStop(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -302,18 +297,18 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportStop, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportStop, server returned, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex")) }, Duration.Inf) } object ReportOperationType extends Enumeration { - val Start : Value = Value("Start") - val Stop : Value = Value("Stop") - val Failure : Value = Value("Failure") + val Start: Value = Value("Start") + val Stop: Value = Value("Stop") + val Failure: Value = Value("Failure") - def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = { + def toOperationUriString(operation: ReportOperationType.Value, id: Long): String = { operation match { case Start => s"/reportStart?Id=$id" @@ -324,4 +319,5 @@ object InstanceRegistry extends JsonSupport with AppLogging } } } + } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala new file mode 100644 index 0000000..eab3de9 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala @@ -0,0 +1,159 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.NotUsed +import akka.actor.ActorRef +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.pattern.ask +import akka.stream.scaladsl.Source +import de.upb.cs.swt.delphi.webapi.IpLogActor._ +import de.upb.cs.swt.delphi.webapi.StatisticsJson._ +import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ +import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} +import spray.json._ + +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + + +class DelphiRoutes(requestLimiter: RequestLimitScheduler) extends JsonSupport with AppLogging { + + def routes: Route = { + requestLimiter.acceptOnValidLimit { + apiRoutes + } + } + + def apiRoutes: Route = { + path("version") { + version + } ~ + path("features") { + features + } ~ + path("statistics") { + statistics + } ~ + pathPrefix("search") { + search + } ~ + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) + } + } + + private def version = { + get { + complete { + BuildInfo.version + } + } + } + + private val featureExtractor = new FeatureQuery(configuration) + + private def features = { + get { + parameter('pretty.?) { (pretty) => + complete( + //TODO: Introduce failure concept for feature extractor + prettyPrint(pretty, featureExtractor.featureList.toJson) + ) + } + } + } + + + private def statistics = { + get { + parameter('pretty.?) { (pretty) => + complete { + val result = new StatisticsQuery(configuration).retrieveStandardStatistics + result match { + case Some(stats) => { + prettyPrint(pretty, stats.toJson) + } + case _ => HttpResponse(StatusCodes.InternalServerError) + } + } + } + } + } + + private def retrieve(identifier: String): Route = { + get { + parameter('pretty.?) { (pretty) => + complete( + RetrieveQuery.retrieve(identifier) match { + case Some(result) => prettyPrint(pretty, result.toJson) + case None => HttpResponse(StatusCodes.NotFound) + } + ) + } + } + } + + def search: Route = { + post { + parameter('pretty.?) { (pretty) => + entity(as[QueryRequest]) { input => + log.info(s"Received search query: ${input.query}") + complete( + new SearchQuery(configuration, featureExtractor).search(input) match { + case Success(result) => prettyPrint(pretty, result.toJson) + case Failure(e) => e.getMessage + } + ) + } + } + } + } +} + +object DelphiRoutes { + + private val ipLogActor = system.actorOf(IpLogActor.props) + private val requestLimiter = new RequestLimitScheduler(ipLogActor) + private val routes = new DelphiRoutes(requestLimiter).routes + + def apply(): Route = routes +} + +private final class RequestLimitScheduler(ipLogActor: ActorRef) extends JsonSupport { + Source.tick(0.second, refreshRate, NotUsed) + .runForeach(_ => { + ipLogActor ! Reset + })(materializer) + + def acceptOnValidLimit(apiRoutes: Route): Route = { + extractClientIP { ip => + val promise = (ipLogActor ? Accepted(ip.toString())).mapTo[Boolean] + onSuccess(promise) { + success => + if (success) { + apiRoutes + } else { + val res = Map("msg" -> "Request limit exceeded") + complete(res.toJson) + } + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala new file mode 100644 index 0000000..c411953 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala @@ -0,0 +1,63 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, Props} +import de.upb.cs.swt.delphi.webapi.IpLogActor._ + +import scala.collection.mutable.ListBuffer + +/** + * @author Hariharan. + */ + + +class IpLogActor extends Actor with AppLogging { + private var ipStore = ListBuffer[String]() + + + override def receive: Receive = { + case Add(ip) => { + ipStore += ip + } + case Reset => { + ipStore = ListBuffer[String]() + } + case Accepted(ip) => { + val hostname = ip.split(":")(0) + self ! Add(hostname) + val noOfReqFromIp = ipStore.count(_.equals(hostname)) + val validTotalReq = ipStore.size < maxTotalNoRequest + val validIndividualReq = noOfReqFromIp < maxIndividualReq + val validReq = validTotalReq && validIndividualReq + val accept = if (validReq) true else false + sender() ! accept + } + } +} + + +object IpLogActor { + + final case class Add(ip: String) + + case object Reset + + final case class Accepted(ip: String) + + def props: Props = Props(new IpLogActor) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index d59dd29..824200d 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -16,114 +16,19 @@ package de.upb.cs.swt.delphi.webapi -import java.util.concurrent.TimeUnit - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.{HttpResponse, StatusCodes} import akka.http.scaladsl.server.{HttpApp, Route} -import akka.stream.ActorMaterializer -import akka.util.Timeout import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry -import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ -import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} -import spray.json._ - -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success} /** * Web server configuration for Delphi web API. */ object Server extends HttpApp with JsonSupport with AppLogging { + val delphiRoutes = DelphiRoutes() - implicit val system = ActorSystem("delphi-webapi") - implicit val materializer = ActorMaterializer() - - private implicit val configuration = new Configuration() - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - - override def routes: Route = - path("version") { - version - } ~ - path("features") { - features - } ~ - path("statistics") { - statistics - } ~ - pathPrefix("search" ) { search } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } - - - private def version = { - get { - complete { - BuildInfo.version - } - } - } - - private val featureExtractor = new FeatureQuery(configuration) - - private def features = { - get { - parameter('pretty.?) { (pretty) => - complete( - //TODO: Introduce failure concept for feature extractor - prettyPrint(pretty, featureExtractor.featureList.toJson) - ) - } - } - } - - private def statistics = { - get { - parameter('pretty.?) { (pretty) => - complete { - val result = new StatisticsQuery(configuration).retrieveStandardStatistics - result match { - case Some(stats) => { - import StatisticsJson._ - prettyPrint(pretty, stats.toJson) - } - case _ => HttpResponse(StatusCodes.InternalServerError) - } - } - } - } + override def routes: Route = { + delphiRoutes } - private def retrieve(identifier: String): Route = { - get { - parameter('pretty.?) { (pretty) => - complete( - RetrieveQuery.retrieve(identifier) match { - case Some(result) => prettyPrint(pretty, result.toJson) - case None => HttpResponse(StatusCodes.NotFound) - } - ) - } - } - } - - def search: Route = { - post { - parameter('pretty.?) { (pretty) => - entity(as[QueryRequest]) { input => - log.info(s"Received search query: ${input.query}") - complete( - new SearchQuery(configuration, featureExtractor).search(input) match { - case Success(result) => prettyPrint(pretty, result.toJson) - case Failure(e) => e.getMessage - } - ) - } - } - } - } def main(args: Array[String]): Unit = { sys.addShutdownHook({ @@ -134,7 +39,6 @@ object Server extends HttpApp with JsonSupport with AppLogging { StartupCheck.check(configuration) Server.startServer(configuration.bindHost, configuration.bindPort, system) - implicit val ec: ExecutionContext = system.dispatcher val terminationFuture = system.terminate() terminationFuture.onComplete { diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala new file mode 100644 index 0000000..907f44d --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala @@ -0,0 +1,52 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.util.Timeout + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + + +package object webapi { + + implicit val system: ActorSystem = ActorSystem("delphi-webapi") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val ec: ExecutionContext = system.dispatcher + + implicit val configuration: Configuration = new Configuration() + + val defaultTimeout = 5 + implicit val timeout: Timeout = Timeout(defaultTimeout, TimeUnit.SECONDS) + + /** + * Maximum no of requests allowed until `refreshRate` is triggered. + */ + val maxTotalNoRequest = 2000 + /** + * Maximum no of requests allowed for an individual until `refreshRate` is triggered. + */ + val maxIndividualReq = 200 + /** + * Used by `Source.tick` to refresh ip log periodically + */ + val refreshRate: FiniteDuration = 5.minutes +} From 13ca827ac13afddfd62470a1acb901f1f316bd02 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 22 Nov 2018 12:30:07 +0100 Subject: [PATCH 04/16] Adapted api to latest changes of the registry api Instances now have link-lists as attributes --- .../delphi/instancemanagement/Instance.scala | 115 ++++++++++++++---- .../instancemanagement/InstanceLink.scala | 38 ++++++ .../instancemanagement/InstanceRegistry.scala | 4 +- .../cs/swt/delphi/webapi/Configuration.scala | 6 +- 4 files changed, 132 insertions(+), 31 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala index 7325b96..1d9babc 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala @@ -15,73 +15,134 @@ // limitations under the License. package de.upb.cs.swt.delphi.instancemanagement + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} -trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { +/** + * Trait defining the implicit JSON formats needed to work with Instances + */ +trait InstanceJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with InstanceLinkJsonSupport { - implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] { + //Custom JSON format for an ComponentType + implicit val componentTypeFormat : JsonFormat[ComponentType] = new JsonFormat[ComponentType] { - def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString) + /** + * Custom write method for serializing an ComponentType + * @param compType The ComponentType to serialize + * @return JsString containing the serialized value + */ + def write(compType : ComponentType) = JsString(compType.toString) - def read(value: JsValue) : InstanceEnums.ComponentType = value match { + /** + * Custom read method for deserialization of an ComponentType + * @param value JsValue to deserialize (must be a JsString) + * @return ComponentType that has been read + * @throws DeserializationException Exception thrown when JsValue is in incorrect format + */ + def read(value: JsValue) : ComponentType = value match { case JsString(s) => s match { - case "Crawler" => InstanceEnums.ComponentType.Crawler - case "WebApi" => InstanceEnums.ComponentType.WebApi - case "WebApp" => InstanceEnums.ComponentType.WebApp - case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement - case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch + case "Crawler" => ComponentType.Crawler + case "WebApi" => ComponentType.WebApi + case "WebApp" => ComponentType.WebApp + case "DelphiManagement" => ComponentType.DelphiManagement + case "ElasticSearch" => ComponentType.ElasticSearch case x => throw DeserializationException(s"Unexpected string value $x for component type.") } - case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.") + case y => throw DeserializationException(s"Unexpected type $y during deserialization component type.") } } - implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] { + //Custom JSON format for an InstanceState + implicit val stateFormat : JsonFormat[InstanceState] = new JsonFormat[InstanceState] { - def write(compType : InstanceEnums.State) = JsString(compType.toString) + /** + * Custom write method for serializing an InstanceState + * @param state The InstanceState to serialize + * @return JsString containing the serialized value + */ + def write(state : InstanceState) = JsString(state.toString) - def read(value: JsValue) : InstanceEnums.State = value match { + /** + * Custom read method for deserialization of an InstanceState + * @param value JsValue to deserialize (must be a JsString) + * @return InstanceState that has been read + * @throws DeserializationException Exception thrown when JsValue is in incorrect format + */ + def read(value: JsValue) : InstanceState = value match { case JsString(s) => s match { - case "Running" => InstanceEnums.InstanceState.Running - case "Stopped" => InstanceEnums.InstanceState.Stopped - case "Failed" => InstanceEnums.InstanceState.Failed - case "Paused" => InstanceEnums.InstanceState.Paused - case "NotReachable" => InstanceEnums.InstanceState.NotReachable + case "Running" => InstanceState.Running + case "Stopped" => InstanceState.Stopped + case "Failed" => InstanceState.Failed + case "Paused" => InstanceState.Paused + case "NotReachable" => InstanceState.NotReachable + case "Deploying" => InstanceState.Deploying case x => throw DeserializationException(s"Unexpected string value $x for instance state.") } - case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.") + case y => throw DeserializationException(s"Unexpected type $y during deserialization instance state.") } } - implicit val instanceFormat : JsonFormat[Instance] = jsonFormat8(Instance) + //JSON format for Instances + implicit val instanceFormat : JsonFormat[Instance] = jsonFormat10(Instance) } +/** + * The instance type used for transmitting data about an instance from an to the registry + * @param id Id of the instance. This is an Option[Long], as an registering instance will not yet have an id. + * @param host Host of the instance. + * @param portNumber Port the instance is reachable at. + * @param name Name of the instance + * @param componentType ComponentType of the instance. + * @param dockerId The docker container id of the instance. This is an Option[String], as not all instance have to be docker containers. + * @param instanceState State of the instance + */ final case class Instance ( id: Option[Long], host: String, portNumber: Long, name: String, - componentType: InstanceEnums.ComponentType, + componentType: ComponentType, dockerId: Option[String], - instanceState: InstanceEnums.State, - labels: List[String] + instanceState: InstanceState, + labels: List[String], + linksTo: List[InstanceLink], + linksFrom: List[InstanceLink] ) + +/** + * Enumerations concerning instances + */ object InstanceEnums { + + //Type to use when working with component types type ComponentType = ComponentType.Value + + /** + * ComponentType enumeration defining the valid types of delphi components + */ object ComponentType extends Enumeration { val Crawler : Value = Value("Crawler") - val ElasticSearch : Value = Value("ElasticSearch") val WebApi : Value = Value("WebApi") val WebApp : Value = Value("WebApp") val DelphiManagement : Value = Value("DelphiManagement") + val ElasticSearch : Value = Value("ElasticSearch") } - type State = InstanceState.Value + + //Type to use when working with instance states + type InstanceState = InstanceState.Value + + /** + * InstanceState enumeration defining the valid states for instances of delphi components + */ object InstanceState extends Enumeration { + val Deploying : Value = Value("Deploying") val Running : Value = Value("Running") - val Paused : Value = Value("Paused") val Stopped : Value = Value("Stopped") val Failed : Value = Value("Failed") + val Paused : Value = Value("Paused") val NotReachable : Value = Value("NotReachable") } -} + +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala new file mode 100644 index 0000000..b3641b0 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala @@ -0,0 +1,38 @@ +package de.upb.cs.swt.delphi.instancemanagement + +import LinkEnums.LinkState +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} + +trait InstanceLinkJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + + implicit val linkStateFormat: JsonFormat[LinkState] = new JsonFormat[LinkState] { + override def read(value: JsValue): LinkState = value match { + case JsString(s) => s match { + case "Assigned" => LinkState.Assigned + case "Outdated" => LinkState.Outdated + case "Failed" => LinkState.Failed + case x => throw DeserializationException(s"Unexpected string value $x for LinkState.") + } + case y => throw DeserializationException(s"Unexpected type $y during deserialization of LinkState") + } + + override def write(linkState: LinkState): JsValue = JsString(linkState.toString) + } + + implicit val instanceLinkFormat: JsonFormat[InstanceLink] = + jsonFormat3(InstanceLink) +} + + +final case class InstanceLink(idFrom: Long, idTo:Long, linkState: LinkState) + +object LinkEnums { + type LinkState = LinkState.Value + + object LinkState extends Enumeration { + val Assigned: Value = Value("Assigned") + val Failed: Value = Value("Failed") + val Outdated: Value = Value("Outdated") + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 9ca7c36..3aa6b28 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -30,7 +30,7 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} -object InstanceRegistry extends JsonSupport with AppLogging { +object InstanceRegistry extends InstanceJsonSupport with AppLogging { lazy val instanceIdFromEnv: Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption @@ -248,7 +248,7 @@ object InstanceRegistry extends JsonSupport with AppLogging { private def createInstance(id: Option[Long], controlPort: Int, name: String): Instance = Instance(id, InetAddress.getLocalHost.getHostAddress, - controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String]) + controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String], List.empty[InstanceLink], List.empty[InstanceLink]) def reportStart(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 1cc8b1a..84de7ed 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -19,7 +19,7 @@ package de.upb.cs.swt.delphi.webapi import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.{ElasticsearchClientUri, Index, IndexAndType} import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} -import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceRegistry} +import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceLink, InstanceRegistry} import scala.util.{Failure, Success, Try} @@ -50,7 +50,9 @@ class Configuration( //Server and Elasticsearch configuration ComponentType.ElasticSearch, None, InstanceState.Running, - List.empty[String]) + List.empty[String], + List.empty[InstanceLink], + List.empty[InstanceLink]) } val defaultElasticSearchPort: Int = 9200 val defaultElasticSearchHost: String = "elasticsearch://localhost" From d2ddb3434916a4bc6454321fb4149809c3830d63 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Mon, 26 Nov 2018 21:20:43 +0100 Subject: [PATCH 05/16] Inserted missing file header --- .../delphi/instancemanagement/InstanceLink.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala index b3641b0..9377251 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala @@ -1,3 +1,18 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.instancemanagement import LinkEnums.LinkState From 5cce2b13765fdda9fc0fc6a265cc80f4dc6c9af5 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 12 Dec 2018 21:56:37 +0100 Subject: [PATCH 06/16] Introducing JWT based authentication for communication with registry --- build.sbt | 2 + .../instancemanagement/InstanceRegistry.scala | 12 +++--- .../cs/swt/delphi/webapi/Configuration.scala | 2 + .../webapi/authorization/AccessToken.scala | 37 +++++++++++++++++++ .../webapi/authorization/AuthProvider.scala | 36 ++++++++++++++++++ 5 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala diff --git a/build.sbt b/build.sbt index bc0ac1d..f632d82 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,8 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion, ) +libraryDependencies += "com.pauldijou" %% "jwt-core" % "1.0.0" + libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 3aa6b28..9b88234 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -20,9 +20,11 @@ import java.net.InetAddress import akka.http.scaladsl.Http import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.unmarshalling.Unmarshal import akka.util.ByteString import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.webapi.authorization.AuthProvider import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, _} import spray.json._ @@ -92,7 +94,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { method = HttpMethods.POST, configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) - Await.result(Http(system).singleRequest(request) map { response => + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => if (response.status == StatusCodes.OK) { log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") Success() @@ -146,7 +148,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { configuration.instanceRegistryUri + s"/matchingInstance?Id=${configuration.assignedID.getOrElse(-1)}&ComponentType=ElasticSearch") - Await.result(Http(system).singleRequest(request) map { response => + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => response.status match { case StatusCodes.OK => try { @@ -189,7 +191,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { configuration.instanceRegistryUri + s"/matchingResult?CallerId=${configuration.assignedID.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable") - Await.result(Http(system).singleRequest(request) map { response => + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() @@ -216,7 +218,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") - Await.result(Http(system).singleRequest(request) map { response => + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => if (response.status == StatusCodes.OK) { log.info("Successfully deregistered from Instance Registry.") Success() @@ -237,7 +239,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) - Http(system).singleRequest(request) + Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) } catch { case dx: DeserializationException => log.warning(s"Failed to deregister to Instance Registry, exception: $dx") diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 84de7ed..9573eae 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -88,6 +88,8 @@ class Configuration( //Server and Elasticsearch configuration } lazy val instanceId: Option[Long] = InstanceRegistry.handleInstanceStart(configuration = this) + val jwtSecretKey: String = sys.env.getOrElse("DELPHI_JWT_SECRET","changeme") + } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala new file mode 100644 index 0000000..e9cf60a --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala @@ -0,0 +1,37 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.webapi.authorization + +import akka.http.scaladsl.model.DateTime +import de.upb.cs.swt.delphi.webapi.authorization.AccessTokenEnums.UserType + +final case class AccessToken(userId: String, + userType: UserType, + expiresAt: DateTime, + issuedAt: DateTime, + notBefore: DateTime, + scope: List[String]) + +object AccessTokenEnums { + + type UserType = UserType.Value + + object UserType extends Enumeration { + val User : Value = Value("User") + val Admin: Value = Value("Admin") + val Component: Value = Value("Component") + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala new file mode 100644 index 0000000..902d9e6 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala @@ -0,0 +1,36 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.webapi.authorization + +import de.upb.cs.swt.delphi.webapi +import pdi.jwt.{Jwt, JwtAlgorithm, JwtClaim} +import spray.json.{JsArray, JsString} + +object AuthProvider { + + def generateJwt(validFor: Long = 3): String = { + val claim = JwtClaim() + .issuedNow + .expiresIn(validFor) + .startsNow + .+("user_id", webapi.configuration.instanceName) + .+("user_type", "Component") + + + Jwt.encode(claim, webapi.configuration.jwtSecretKey, JwtAlgorithm.HS256) + } + +} From 706cac95a521e31d3da4a9020ef528edb70359c3 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Wed, 19 Dec 2018 16:04:16 +0100 Subject: [PATCH 07/16] Added error handling on result size greater than max result window --- .../cs/swt/delphi/webapi/DelphiRoutes.scala | 13 +++- .../delphi/webapi/search/QueryRequest.scala | 17 ++++- .../webapi/search/QueryRequestJson.scala | 15 +++++ .../delphi/webapi/search/SearchQuery.scala | 50 ++++++++++---- .../search/elastic4s/extns/package.scala | 66 +++++++++++++++++++ .../cs/swt/delphi/webapi/search/package.scala | 33 ++++++++++ 6 files changed, 179 insertions(+), 15 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala index eab3de9..4fcfd8b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala @@ -27,7 +27,7 @@ import de.upb.cs.swt.delphi.webapi.IpLogActor._ import de.upb.cs.swt.delphi.webapi.StatisticsJson._ import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchError, SearchQuery} import spray.json._ import scala.concurrent.duration._ @@ -118,7 +118,16 @@ class DelphiRoutes(requestLimiter: RequestLimitScheduler) extends JsonSupport wi complete( new SearchQuery(configuration, featureExtractor).search(input) match { case Success(result) => prettyPrint(pretty, result.toJson) - case Failure(e) => e.getMessage + case Failure(e) => { + e match { + case se: SearchError => { + se.toJson + } + case _ => { + new SearchError("Search query failed").toJson + } + } + } } ) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala index 3479577..ffd57df 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala @@ -1,3 +1,18 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.webapi.search -case class QueryRequest (query : String, limit : Option[Int] = Some(50)) +case class QueryRequest (query : String, limit : Option[Int] = Some(defaultFetchSize)) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala index 6ae239e..09699a6 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala @@ -1,3 +1,18 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.webapi.search import spray.json.DefaultJsonProtocol diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala index 80c11b1..ca8b616 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala @@ -18,18 +18,18 @@ package de.upb.cs.swt.delphi.webapi.search import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.search.SearchHits -import com.sksamuel.elastic4s.http.{ElasticClient, RequestSuccess} +import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.searches.queries.{NoopQuery, Query} -import de.upb.cs.swt.delphi.webapi.{Configuration, FeatureQuery} -import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactTransformer +import de.upb.cs.swt.delphi.webapi.artifacts.{Artifact, ArtifactTransformer} import de.upb.cs.swt.delphi.webapi.search.querylanguage._ +import de.upb.cs.swt.delphi.webapi.{Configuration, FeatureQuery} import scala.util.{Failure, Success, Try} class SearchQuery(configuration: Configuration, featureExtractor: FeatureQuery) { private val client = ElasticClient(configuration.elasticsearchClientUri) - private def checkAndExecuteParsedQuery(ast: CombinatorialExpr, limit : Int): Try[SearchHits] = { + private def checkAndExecuteParsedQuery(ast: CombinatorialExpr, limit: Int): Try[SearchHits] = { val fields = collectFieldNames(ast) if (fields.diff(featureExtractor.featureList.toSeq).size > 0) return Failure(new IllegalArgumentException("Unknown field name used.")) @@ -109,16 +109,42 @@ class SearchQuery(configuration: Configuration, featureExtractor: FeatureQuery) } } - def search(query: QueryRequest) = { - val parserResult = new Syntax(query.query).QueryRule.run() - parserResult match { - case Failure(e) => Failure(e) - case Success(ast) => { - checkAndExecuteParsedQuery(ast, query.limit.getOrElse(50)) match { - case Failure(e) => Failure(e) - case Success(hits) => Success(ArtifactTransformer.transformResults(hits)) + def checkValidSize: Option[Int] = { + import elastic4s.extns._ + import elastic4s.extns.ElasticDslExtn._ + val params = Map("include_defaults" -> true) + val query = SettingsRequest("delphi", params) + val res = client.execute { + query + }.await + res match { + case RequestSuccess(_, b, _, _) => { + maxResultSize(b, configuration) + } + case RequestFailure(_, _, _, _) => { + None + } + } + } + + def search(query: QueryRequest): Try[Array[Artifact]] = { + lazy val size = checkValidSize + val validSize = size.exists(query.limit.getOrElse(defaultFetchSize) < _) + if (validSize) { + val parserResult = new Syntax(query.query).QueryRule.run() + parserResult match { + case Failure(e) => Failure(e) + case Success(ast) => { + checkAndExecuteParsedQuery(ast, query.limit.getOrElse(defaultFetchSize)) match { + case Failure(e) => Failure(e) + case Success(hits) => Success(ArtifactTransformer.transformResults(hits)) + } } } } + else { + val errorMsg = new SearchError(s"Query limit exceeded default limit: ${query.limit}>${size}") + Failure(errorMsg) + } } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala new file mode 100644 index 0000000..fe5332a --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala @@ -0,0 +1,66 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.webapi.search.elastic4s + +import com.sksamuel.elastic4s.http.settings.IndexSettingsResponse +import com.sksamuel.elastic4s.http.{ElasticRequest, Handler} +import com.sksamuel.elastic4s.json.JacksonSupport +import de.upb.cs.swt.delphi.webapi.Configuration + +package object extns { + + case class SettingsRequest(index: String, params: Map[String, Any]) + + trait RichSettingsHandler { + + implicit object RichGetSettings extends Handler[SettingsRequest, IndexSettingsResponse] { + + override def build(request: SettingsRequest): ElasticRequest = { + val endpoint = "/" + request.index + "/_settings" + val req = ElasticRequest("GET", endpoint, request.params) + req + } + } + + } + + def maxResultSize(res: Option[String], config: Configuration): Option[Int] = { + res match { + case Some(j) => { + val custom = s"/${config.esIndex}/settings/index" + val default = s"/${config.esIndex}/defaults/index" + val target = "max_result_window" + val node = JacksonSupport.mapper.readTree(j) + val size = if (node.at(custom).has(target)) { + Some(node.at(custom + "/" + target).asInt()) + } + else { + Some(node.at(default + "/" + target).asInt()) + } + size + } + case None => { + None + } + } + } + + + trait ElasticDslExtn extends RichSettingsHandler + + object ElasticDslExtn extends ElasticDslExtn + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala new file mode 100644 index 0000000..e0b8189 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala @@ -0,0 +1,33 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import spray.json._ + +package object search { + + val defaultFetchSize = 50 + + class SearchError(msg: String) extends RuntimeException(msg) with JsonSupport + + implicit val searchErrorWriter = new JsonWriter[SearchError] { + override def write(obj: SearchError): JsValue = { + JsObject("msg" -> JsString(obj.getMessage)) + } + } + +} From 87606666c471c4a644dadc53d030b99851e086a6 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Wed, 19 Dec 2018 17:17:54 +0100 Subject: [PATCH 08/16] Added test case for failure on large input result --- .../swt/delphi/webapi/SearchQueryTest.scala | 19 +++++++++++++------ .../delphi/webapi/search/SearchQuery.scala | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala index 1f157d2..708d084 100644 --- a/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala @@ -16,17 +16,24 @@ package de.upb.cs.swt.delphi.webapi -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchError, SearchQuery} import org.scalatest.{FlatSpec, Matchers} -import scala.util.Success +import scala.util.Failure class SearchQueryTest extends FlatSpec with Matchers { - "Search query" should "check for fields" in { + "Search query" should "fail on large request limit" in { val configuration = new Configuration() val q = new SearchQuery(configuration, new FeatureQuery(configuration)) - - val response = q.search(QueryRequest("[if_icmpeq (opcode:159)]>1")) - response shouldBe a [Success[_]] + val size = 20000 + val response = q.search(QueryRequest("[dstore_1 (opcode:72)]<1", Some(size))) + response match { + case Failure(exception) => { + exception shouldBe a[SearchError] + } + case _ => { + fail("Limit exceeded should fail") + } + } } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala index ca8b616..3caa421 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala @@ -129,7 +129,7 @@ class SearchQuery(configuration: Configuration, featureExtractor: FeatureQuery) def search(query: QueryRequest): Try[Array[Artifact]] = { lazy val size = checkValidSize - val validSize = size.exists(query.limit.getOrElse(defaultFetchSize) < _) + val validSize = size.exists(query.limit.getOrElse(defaultFetchSize) <= _) if (validSize) { val parserResult = new Syntax(query.query).QueryRule.run() parserResult match { From 59f1f83729c9779c1ced52a10338bae3839f66d9 Mon Sep 17 00:00:00 2001 From: Hariharan Ramanathan Date: Wed, 19 Dec 2018 17:49:30 +0100 Subject: [PATCH 09/16] Added fork to build.sbt --- build.sbt | 2 ++ project/build.properties | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index bc0ac1d..0a9bab1 100644 --- a/build.sbt +++ b/build.sbt @@ -58,3 +58,5 @@ libraryDependencies ++= Seq( ) trapExit := false +fork := true +connectInput := true \ No newline at end of file diff --git a/project/build.properties b/project/build.properties index 9f782f7..c46277f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.1 \ No newline at end of file +sbt.version=1.2.7 \ No newline at end of file From 85ccf7704cacd22ef72213d7f2f182eff5123f6c Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 19 Dec 2018 18:32:15 +0100 Subject: [PATCH 10/16] Use id as name for JWT where applicable --- .../swt/delphi/instancemanagement/InstanceRegistry.scala | 8 ++++++-- .../cs/swt/delphi/webapi/authorization/AuthProvider.scala | 7 +++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 9b88234..2623bab 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -94,7 +94,10 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { method = HttpMethods.POST, configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) - Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => + val useGenericNameForToken = operationType == ReportOperationType.Start //Must use generic name for startup, no id known at that point + + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization", + s"Bearer ${AuthProvider.generateJwt(useGenericName = useGenericNameForToken)}"))) map { response => if (response.status == StatusCodes.OK) { log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") Success() @@ -239,7 +242,8 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) - Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) + //Use generic name for startup, no id present at this point + Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt(useGenericName = true)}"))) } catch { case dx: DeserializationException => log.warning(s"Failed to deregister to Instance Registry, exception: $dx") diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala index 902d9e6..d06ae7b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala @@ -17,16 +17,15 @@ package de.upb.cs.swt.delphi.webapi.authorization import de.upb.cs.swt.delphi.webapi import pdi.jwt.{Jwt, JwtAlgorithm, JwtClaim} -import spray.json.{JsArray, JsString} object AuthProvider { - def generateJwt(validFor: Long = 3): String = { + def generateJwt(validFor: Long = 1, useGenericName: Boolean = false): String = { val claim = JwtClaim() .issuedNow - .expiresIn(validFor) + .expiresIn(validFor * 60) .startsNow - .+("user_id", webapi.configuration.instanceName) + .+("user_id", if (useGenericName) webapi.configuration.instanceName else s"${webapi.configuration.assignedID.get}") .+("user_type", "Component") From 300caa1e82d42b838826dcc92cf6216ba974cc08 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 19 Dec 2018 18:32:50 +0100 Subject: [PATCH 11/16] Fixed application not terminating when connected to registry --- src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 824200d..2f2684b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -31,17 +31,13 @@ object Server extends HttpApp with JsonSupport with AppLogging { def main(args: Array[String]): Unit = { - sys.addShutdownHook({ - log.warning("Received shutdown signal.") - InstanceRegistry.handleInstanceStop(configuration) - }) StartupCheck.check(configuration) Server.startServer(configuration.bindHost, configuration.bindPort, system) - val terminationFuture = system.terminate() + InstanceRegistry.handleInstanceStop(configuration) - terminationFuture.onComplete { + system.terminate().onComplete{ sys.exit(0) } } From 8876011c6b3b037fae3f1e701cde46690da2e4bc Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 3 Jan 2019 11:50:57 +0100 Subject: [PATCH 12/16] Removed unused file --- .../webapi/authorization/AccessToken.scala | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala deleted file mode 100644 index e9cf60a..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AccessToken.scala +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (C) 2018 The Delphi Team. -// See the LICENCE file distributed with this work for additional -// information regarding copyright ownership. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package de.upb.cs.swt.delphi.webapi.authorization - -import akka.http.scaladsl.model.DateTime -import de.upb.cs.swt.delphi.webapi.authorization.AccessTokenEnums.UserType - -final case class AccessToken(userId: String, - userType: UserType, - expiresAt: DateTime, - issuedAt: DateTime, - notBefore: DateTime, - scope: List[String]) - -object AccessTokenEnums { - - type UserType = UserType.Value - - object UserType extends Enumeration { - val User : Value = Value("User") - val Admin: Value = Value("Admin") - val Component: Value = Value("Component") - } -} From 90cc6e3839f0a1b79bf0a0529563cf6aed3d9f3d Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 4 Jan 2019 13:38:51 +0100 Subject: [PATCH 13/16] Codestyle: Inserted spaces before plus operator --- .../upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala index d06ae7b..f03ad12 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala @@ -25,8 +25,8 @@ object AuthProvider { .issuedNow .expiresIn(validFor * 60) .startsNow - .+("user_id", if (useGenericName) webapi.configuration.instanceName else s"${webapi.configuration.assignedID.get}") - .+("user_type", "Component") + . + ("user_id", if (useGenericName) webapi.configuration.instanceName else s"${webapi.configuration.assignedID.get}") + . + ("user_type", "Component") Jwt.encode(claim, webapi.configuration.jwtSecretKey, JwtAlgorithm.HS256) From 32b8a6f5210156d121df6835d9dd30d4eb0d5714 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 22 Jan 2019 09:38:29 +0100 Subject: [PATCH 14/16] Adapted registry interface to newest API version Removed unused code --- .../instancemanagement/InstanceRegistry.scala | 82 +++++-------------- 1 file changed, 19 insertions(+), 63 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 2623bab..56d550e 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -120,7 +120,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { def register(configuration: Configuration): Try[Long] = { val instance = createInstance(None, configuration.bindPort, configuration.instanceName) - Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map { response => + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/instances/register") map { response => if (response.status == StatusCodes.OK) { Await.result(Unmarshal(response.entity).to[String] map { assignedID => val id = assignedID.toLong @@ -149,7 +149,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { } else { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + - s"/matchingInstance?Id=${configuration.assignedID.getOrElse(-1)}&ComponentType=ElasticSearch") + s"/instances/${configuration.assignedID.getOrElse(-1)}/matchingInstance?ComponentType=ElasticSearch") Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => response.status match { @@ -189,12 +189,17 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) + + val MatchingData = JsObject("MatchingSuccessful" -> JsBoolean(isElasticSearchReachable), + "SenderId" -> JsNumber(configuration.assignedID.getOrElse(-1L))) + val request = HttpRequest( method = HttpMethods.POST, - configuration.instanceRegistryUri + - s"/matchingResult?CallerId=${configuration.assignedID.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable") + configuration.instanceRegistryUri + s"/instances/$idToPost/matchingResult") - Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => + Await.result(Http(system).singleRequest(request + .withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}")) + .withEntity(ContentTypes.`application/json`, ByteString(MatchingData.toJson.toString))) map { response => if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() @@ -219,7 +224,8 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { } else { val id: Long = configuration.assignedID.getOrElse(-1L) - val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") + val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + + s"/instances/$id/deregister") Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => if (response.status == StatusCodes.OK) { @@ -241,9 +247,11 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { - val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) + val request = HttpRequest(method = HttpMethods.POST, uri = uri) //Use generic name for startup, no id present at this point - Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt(useGenericName = true)}"))) + Http(system).singleRequest(request + .withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt(useGenericName = true)}")) + .withEntity(ContentTypes.`application/json`, ByteString(instance.toJson(instanceFormat).toString))) } catch { case dx: DeserializationException => log.warning(s"Failed to deregister to Instance Registry, exception: $dx") @@ -256,58 +264,6 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String], List.empty[InstanceLink], List.empty[InstanceLink]) - def reportStart(id: String, configuration: Configuration): Try[ResponseEntity] = { - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") - Await.result(Http(system).singleRequest(request) map { response => - if (response.status == StatusCodes.OK) { - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportStart, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode")) - } - } recover { case ex => - log.warning(s"Failed to perform reportStart, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex")) - }, Duration.Inf) - } - - def reportFailure(id: String, configuration: Configuration): Try[ResponseEntity] = { - - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure") - Await.result(Http(system).singleRequest(request) map { response => - if (response.status == StatusCodes.OK) { - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportFailure, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode")) - } - } recover { case ex => - log.warning(s"Failed to perform reportFailure, server returned, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex")) - }, Duration.Inf) - } - - def reportStop(id: String, configuration: Configuration): Try[ResponseEntity] = { - - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop") - Await.result(Http(system).singleRequest(request) map { response => - if (response.status == StatusCodes.OK) { - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportStop, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode")) - } - } recover { case ex => - log.warning(s"Failed to perform reportStop, server returned, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex")) - }, Duration.Inf) - } object ReportOperationType extends Enumeration { val Start: Value = Value("Start") @@ -317,11 +273,11 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { def toOperationUriString(operation: ReportOperationType.Value, id: Long): String = { operation match { case Start => - s"/reportStart?Id=$id" + s"/instances/$id/reportStart" case Stop => - s"/reportStop?Id=$id" + s"/instances/$id/reportStop" case _ => - s"/reportFailure?Id=$id" + s"/instances/$id/reportFailure" } } } From a186175d21e9e8cb9e7f9383fbf601dbe0ced2a5 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 7 Feb 2019 15:01:43 +0100 Subject: [PATCH 15/16] Fixed variable name being uppercase (codestyle) --- .../cs/swt/delphi/instancemanagement/InstanceRegistry.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index 56d550e..6a7517c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -190,7 +190,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { } else { val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) - val MatchingData = JsObject("MatchingSuccessful" -> JsBoolean(isElasticSearchReachable), + val matchingData = JsObject("MatchingSuccessful" -> JsBoolean(isElasticSearchReachable), "SenderId" -> JsNumber(configuration.assignedID.getOrElse(-1L))) val request = HttpRequest( @@ -199,7 +199,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging { Await.result(Http(system).singleRequest(request .withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}")) - .withEntity(ContentTypes.`application/json`, ByteString(MatchingData.toJson.toString))) map { response => + .withEntity(ContentTypes.`application/json`, ByteString(matchingData.toJson.toString))) map { response => if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() From 4bd6e3f666cb1ebf8367486d451281bbb45211dc Mon Sep 17 00:00:00 2001 From: Mitali Gupta Date: Fri, 24 May 2019 18:02:17 +0200 Subject: [PATCH 16/16] update build.sbt file Pushing feature branch and fixing vulnerability issue because of jackson dependency --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 2f9bf82..2b507e0 100644 --- a/build.sbt +++ b/build.sbt @@ -56,7 +56,7 @@ scalastyleConfig := baseDirectory.value / "project" / "scalastyle-config.xml" // Pinning secure versions of insecure transitive libraryDependencies // Please update when updating dependencies above (including Play plugin) libraryDependencies ++= Seq( - "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.7" + "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.9" ) trapExit := false