diff --git a/.gitignore b/.gitignore index e051c58..e21d42e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea project/target target +local-path \ No newline at end of file diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala new file mode 100644 index 0000000..7a8e6b3 --- /dev/null +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala @@ -0,0 +1,80 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import spray.json._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +/** + * @author Hariharan. + */ +class RequestLimitCheck extends WordSpec with Matchers with BeforeAndAfterAll with JsonSupport { + val delphiRoutes = DelphiRoutes() + val serverBinding: Future[Http.ServerBinding] = Http() + .bindAndHandle(delphiRoutes, "localhost", 8085) + + override protected def beforeAll(): Unit = { + serverBinding.onComplete { + case Success(server) => + println(s"Server started at http://${server.localAddress.getHostString}:${server.localAddress.getPort}/") + case Failure(e) => + e.printStackTrace() + sys.exit(0) + } + } + + + "Requests" should { + "throttle when limit reached" in { + def responseFuture: Future[HttpResponse] = Http() + .singleRequest(HttpRequest(uri = "http://localhost:8085/version")) + + //Completing request limit + for (i <- (1 to maxIndividualReq)) { + Await.result(responseFuture, 1.second) + } + case class LimitMsg(msg: String) + implicit val msgFormat = jsonFormat1(LimitMsg) + + val limitReachedFuture = responseFuture + limitReachedFuture.onComplete { + case Success(res) => { + val msgPromise = Unmarshal(res.entity).to[LimitMsg] + msgPromise.onComplete { + case Success(limitMsg) => { + assertResult("Request limit exceeded")(limitMsg.msg) + } + case Failure(exception) => { + fail(exception) + } + } + } + case Failure(exception) => { + fail(exception) + } + } + Await.result(system.terminate(), 5.seconds) + } + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index c908576..9ca7c36 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -18,31 +18,25 @@ package de.upb.cs.swt.delphi.instancemanagement import java.net.InetAddress -import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.ActorMaterializer import akka.util.ByteString import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} -import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server} +import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, _} +import spray.json._ -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} -import spray.json._ -object InstanceRegistry extends JsonSupport with AppLogging -{ +object InstanceRegistry extends JsonSupport with AppLogging { - implicit val system : ActorSystem = Server.system - implicit val ec : ExecutionContext = system.dispatcher - implicit val materializer : ActorMaterializer = Server.materializer - lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption + lazy val instanceIdFromEnv: Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption - def handleInstanceStart(configuration: Configuration) : Option[Long] = { + def handleInstanceStart(configuration: Configuration): Option[Long] = { instanceIdFromEnv match { case Some(id) => reportStart(configuration) match { @@ -57,49 +51,49 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def handleInstanceStop(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceStop(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportStop(configuration) } else { deregister(configuration) } } - def handleInstanceFailure(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceFailure(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportFailure(configuration) } else { deregister(configuration) } } - def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) + def reportStart(configuration: Configuration): Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) - def reportStop(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry) { + def reportStop(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Stop) } else { Failure(new RuntimeException("Cannot report stop, no instance registry available.")) } } - def reportFailure(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry){ + def reportFailure(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Failure) } else { Failure(new RuntimeException("Cannot report failure, no instance registry available.")) } } - private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = { + private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value): Try[Unit] = { instanceIdFromEnv match { case Some(id) => val request = HttpRequest( method = HttpMethods.POST, configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") Success() } @@ -108,7 +102,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")) }, Duration.Inf) @@ -117,11 +111,12 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")) } } - def register(configuration: Configuration) :Try[Long] = { - val instance = createInstance(None,configuration.bindPort, configuration.instanceName) - Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => - if(response.status == StatusCodes.OK){ + def register(configuration: Configuration): Try[Long] = { + val instance = createInstance(None, configuration.bindPort, configuration.instanceName) + + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map { response => + if (response.status == StatusCodes.OK) { Await.result(Unmarshal(response.entity).to[String] map { assignedID => val id = assignedID.toLong log.info(s"Successfully registered at Instance Registry, got ID $id.") @@ -137,25 +132,25 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to register at Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } - def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = { - if(!configuration.usingInstanceRegistry) { + def retrieveElasticSearchInstance(configuration: Configuration): Try[Instance] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) } else { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + s"/matchingInstance?Id=${configuration.assignedID.getOrElse(-1)}&ComponentType=ElasticSearch") - Await.result(Http(system).singleRequest(request) map {response => + Await.result(Http(system).singleRequest(request) map { response => response.status match { case StatusCodes.OK => try { - val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) + val instanceString: String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat) val elasticIP = esInstance.host log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") @@ -180,12 +175,12 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { + def sendMatchingResult(isElasticSearchReachable: Boolean, configuration: Configuration): Try[Unit] = { - if(!configuration.usingInstanceRegistry) { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) } else { - if(configuration.elasticsearchInstance.id.isEmpty) { + if (configuration.elasticsearchInstance.id.isEmpty) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) @@ -194,8 +189,8 @@ object InstanceRegistry extends JsonSupport with AppLogging configuration.instanceRegistryUri + s"/matchingResult?CallerId=${configuration.assignedID.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() } @@ -204,7 +199,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to post matching result to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) }, Duration.Inf) @@ -213,16 +208,16 @@ object InstanceRegistry extends JsonSupport with AppLogging } - def deregister(configuration: Configuration) : Try[Unit] = { - if(!configuration.usingInstanceRegistry){ + def deregister(configuration: Configuration): Try[Unit] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) } else { - val id : Long = configuration.assignedID.getOrElse(-1L) + val id: Long = configuration.assignedID.getOrElse(-1L) val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { log.info("Successfully deregistered from Instance Registry.") Success() } @@ -232,14 +227,14 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to deregister to Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } } - def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = { + def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) Http(system).singleRequest(request) @@ -251,14 +246,14 @@ object InstanceRegistry extends JsonSupport with AppLogging } - private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = + private def createInstance(id: Option[Long], controlPort: Int, name: String): Instance = Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String]) - def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={ + def reportStart(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -266,17 +261,17 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportStart, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportStart, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex")) }, Duration.Inf) } - def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = { + def reportFailure(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -284,17 +279,17 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportFailure, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportFailure, server returned, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex")) }, Duration.Inf) } - def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = { + def reportStop(id: String, configuration: Configuration): Try[ResponseEntity] = { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request) map { response => + if (response.status == StatusCodes.OK) { Success(response.entity) } else { @@ -302,18 +297,18 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to perform reportStop, server returned $statuscode") Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to perform reportStop, server returned, exception: $ex") Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex")) }, Duration.Inf) } object ReportOperationType extends Enumeration { - val Start : Value = Value("Start") - val Stop : Value = Value("Stop") - val Failure : Value = Value("Failure") + val Start: Value = Value("Start") + val Stop: Value = Value("Stop") + val Failure: Value = Value("Failure") - def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = { + def toOperationUriString(operation: ReportOperationType.Value, id: Long): String = { operation match { case Start => s"/reportStart?Id=$id" @@ -324,4 +319,5 @@ object InstanceRegistry extends JsonSupport with AppLogging } } } + } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala new file mode 100644 index 0000000..eab3de9 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala @@ -0,0 +1,159 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.NotUsed +import akka.actor.ActorRef +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.pattern.ask +import akka.stream.scaladsl.Source +import de.upb.cs.swt.delphi.webapi.IpLogActor._ +import de.upb.cs.swt.delphi.webapi.StatisticsJson._ +import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ +import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} +import spray.json._ + +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + + +class DelphiRoutes(requestLimiter: RequestLimitScheduler) extends JsonSupport with AppLogging { + + def routes: Route = { + requestLimiter.acceptOnValidLimit { + apiRoutes + } + } + + def apiRoutes: Route = { + path("version") { + version + } ~ + path("features") { + features + } ~ + path("statistics") { + statistics + } ~ + pathPrefix("search") { + search + } ~ + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) + } + } + + private def version = { + get { + complete { + BuildInfo.version + } + } + } + + private val featureExtractor = new FeatureQuery(configuration) + + private def features = { + get { + parameter('pretty.?) { (pretty) => + complete( + //TODO: Introduce failure concept for feature extractor + prettyPrint(pretty, featureExtractor.featureList.toJson) + ) + } + } + } + + + private def statistics = { + get { + parameter('pretty.?) { (pretty) => + complete { + val result = new StatisticsQuery(configuration).retrieveStandardStatistics + result match { + case Some(stats) => { + prettyPrint(pretty, stats.toJson) + } + case _ => HttpResponse(StatusCodes.InternalServerError) + } + } + } + } + } + + private def retrieve(identifier: String): Route = { + get { + parameter('pretty.?) { (pretty) => + complete( + RetrieveQuery.retrieve(identifier) match { + case Some(result) => prettyPrint(pretty, result.toJson) + case None => HttpResponse(StatusCodes.NotFound) + } + ) + } + } + } + + def search: Route = { + post { + parameter('pretty.?) { (pretty) => + entity(as[QueryRequest]) { input => + log.info(s"Received search query: ${input.query}") + complete( + new SearchQuery(configuration, featureExtractor).search(input) match { + case Success(result) => prettyPrint(pretty, result.toJson) + case Failure(e) => e.getMessage + } + ) + } + } + } + } +} + +object DelphiRoutes { + + private val ipLogActor = system.actorOf(IpLogActor.props) + private val requestLimiter = new RequestLimitScheduler(ipLogActor) + private val routes = new DelphiRoutes(requestLimiter).routes + + def apply(): Route = routes +} + +private final class RequestLimitScheduler(ipLogActor: ActorRef) extends JsonSupport { + Source.tick(0.second, refreshRate, NotUsed) + .runForeach(_ => { + ipLogActor ! Reset + })(materializer) + + def acceptOnValidLimit(apiRoutes: Route): Route = { + extractClientIP { ip => + val promise = (ipLogActor ? Accepted(ip.toString())).mapTo[Boolean] + onSuccess(promise) { + success => + if (success) { + apiRoutes + } else { + val res = Map("msg" -> "Request limit exceeded") + complete(res.toJson) + } + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala new file mode 100644 index 0000000..c411953 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala @@ -0,0 +1,63 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, Props} +import de.upb.cs.swt.delphi.webapi.IpLogActor._ + +import scala.collection.mutable.ListBuffer + +/** + * @author Hariharan. + */ + + +class IpLogActor extends Actor with AppLogging { + private var ipStore = ListBuffer[String]() + + + override def receive: Receive = { + case Add(ip) => { + ipStore += ip + } + case Reset => { + ipStore = ListBuffer[String]() + } + case Accepted(ip) => { + val hostname = ip.split(":")(0) + self ! Add(hostname) + val noOfReqFromIp = ipStore.count(_.equals(hostname)) + val validTotalReq = ipStore.size < maxTotalNoRequest + val validIndividualReq = noOfReqFromIp < maxIndividualReq + val validReq = validTotalReq && validIndividualReq + val accept = if (validReq) true else false + sender() ! accept + } + } +} + + +object IpLogActor { + + final case class Add(ip: String) + + case object Reset + + final case class Accepted(ip: String) + + def props: Props = Props(new IpLogActor) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index d59dd29..824200d 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -16,114 +16,19 @@ package de.upb.cs.swt.delphi.webapi -import java.util.concurrent.TimeUnit - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.{HttpResponse, StatusCodes} import akka.http.scaladsl.server.{HttpApp, Route} -import akka.stream.ActorMaterializer -import akka.util.Timeout import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry -import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ -import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} -import spray.json._ - -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success} /** * Web server configuration for Delphi web API. */ object Server extends HttpApp with JsonSupport with AppLogging { + val delphiRoutes = DelphiRoutes() - implicit val system = ActorSystem("delphi-webapi") - implicit val materializer = ActorMaterializer() - - private implicit val configuration = new Configuration() - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - - override def routes: Route = - path("version") { - version - } ~ - path("features") { - features - } ~ - path("statistics") { - statistics - } ~ - pathPrefix("search" ) { search } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } - - - private def version = { - get { - complete { - BuildInfo.version - } - } - } - - private val featureExtractor = new FeatureQuery(configuration) - - private def features = { - get { - parameter('pretty.?) { (pretty) => - complete( - //TODO: Introduce failure concept for feature extractor - prettyPrint(pretty, featureExtractor.featureList.toJson) - ) - } - } - } - - private def statistics = { - get { - parameter('pretty.?) { (pretty) => - complete { - val result = new StatisticsQuery(configuration).retrieveStandardStatistics - result match { - case Some(stats) => { - import StatisticsJson._ - prettyPrint(pretty, stats.toJson) - } - case _ => HttpResponse(StatusCodes.InternalServerError) - } - } - } - } + override def routes: Route = { + delphiRoutes } - private def retrieve(identifier: String): Route = { - get { - parameter('pretty.?) { (pretty) => - complete( - RetrieveQuery.retrieve(identifier) match { - case Some(result) => prettyPrint(pretty, result.toJson) - case None => HttpResponse(StatusCodes.NotFound) - } - ) - } - } - } - - def search: Route = { - post { - parameter('pretty.?) { (pretty) => - entity(as[QueryRequest]) { input => - log.info(s"Received search query: ${input.query}") - complete( - new SearchQuery(configuration, featureExtractor).search(input) match { - case Success(result) => prettyPrint(pretty, result.toJson) - case Failure(e) => e.getMessage - } - ) - } - } - } - } def main(args: Array[String]): Unit = { sys.addShutdownHook({ @@ -134,7 +39,6 @@ object Server extends HttpApp with JsonSupport with AppLogging { StartupCheck.check(configuration) Server.startServer(configuration.bindHost, configuration.bindPort, system) - implicit val ec: ExecutionContext = system.dispatcher val terminationFuture = system.terminate() terminationFuture.onComplete { diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala new file mode 100644 index 0000000..907f44d --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala @@ -0,0 +1,52 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.util.Timeout + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + + +package object webapi { + + implicit val system: ActorSystem = ActorSystem("delphi-webapi") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val ec: ExecutionContext = system.dispatcher + + implicit val configuration: Configuration = new Configuration() + + val defaultTimeout = 5 + implicit val timeout: Timeout = Timeout(defaultTimeout, TimeUnit.SECONDS) + + /** + * Maximum no of requests allowed until `refreshRate` is triggered. + */ + val maxTotalNoRequest = 2000 + /** + * Maximum no of requests allowed for an individual until `refreshRate` is triggered. + */ + val maxIndividualReq = 200 + /** + * Used by `Source.tick` to refresh ip log periodically + */ + val refreshRate: FiniteDuration = 5.minutes +}