diff --git a/.gitignore b/.gitignore index e051c58..e21d42e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .idea project/target target +local-path \ No newline at end of file diff --git a/build.sbt b/build.sbt index bc0ac1d..2b507e0 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,8 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion, ) +libraryDependencies += "com.pauldijou" %% "jwt-core" % "1.0.0" + libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4" @@ -54,7 +56,9 @@ scalastyleConfig := baseDirectory.value / "project" / "scalastyle-config.xml" // Pinning secure versions of insecure transitive libraryDependencies // Please update when updating dependencies above (including Play plugin) libraryDependencies ++= Seq( - "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.7" + "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.9" ) trapExit := false +fork := true +connectInput := true \ No newline at end of file diff --git a/project/build.properties b/project/build.properties index 9f782f7..c46277f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.1 \ No newline at end of file +sbt.version=1.2.7 \ No newline at end of file diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala new file mode 100644 index 0000000..7a8e6b3 --- /dev/null +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/RequestLimitCheck.scala @@ -0,0 +1,80 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import spray.json._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +/** + * @author Hariharan. + */ +class RequestLimitCheck extends WordSpec with Matchers with BeforeAndAfterAll with JsonSupport { + val delphiRoutes = DelphiRoutes() + val serverBinding: Future[Http.ServerBinding] = Http() + .bindAndHandle(delphiRoutes, "localhost", 8085) + + override protected def beforeAll(): Unit = { + serverBinding.onComplete { + case Success(server) => + println(s"Server started at http://${server.localAddress.getHostString}:${server.localAddress.getPort}/") + case Failure(e) => + e.printStackTrace() + sys.exit(0) + } + } + + + "Requests" should { + "throttle when limit reached" in { + def responseFuture: Future[HttpResponse] = Http() + .singleRequest(HttpRequest(uri = "http://localhost:8085/version")) + + //Completing request limit + for (i <- (1 to maxIndividualReq)) { + Await.result(responseFuture, 1.second) + } + case class LimitMsg(msg: String) + implicit val msgFormat = jsonFormat1(LimitMsg) + + val limitReachedFuture = responseFuture + limitReachedFuture.onComplete { + case Success(res) => { + val msgPromise = Unmarshal(res.entity).to[LimitMsg] + msgPromise.onComplete { + case Success(limitMsg) => { + assertResult("Request limit exceeded")(limitMsg.msg) + } + case Failure(exception) => { + fail(exception) + } + } + } + case Failure(exception) => { + fail(exception) + } + } + Await.result(system.terminate(), 5.seconds) + } + } +} diff --git a/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala b/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala index 1f157d2..708d084 100644 --- a/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala +++ b/src/it/scala/de/upb/cs/swt/delphi/webapi/SearchQueryTest.scala @@ -16,17 +16,24 @@ package de.upb.cs.swt.delphi.webapi -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchError, SearchQuery} import org.scalatest.{FlatSpec, Matchers} -import scala.util.Success +import scala.util.Failure class SearchQueryTest extends FlatSpec with Matchers { - "Search query" should "check for fields" in { + "Search query" should "fail on large request limit" in { val configuration = new Configuration() val q = new SearchQuery(configuration, new FeatureQuery(configuration)) - - val response = q.search(QueryRequest("[if_icmpeq (opcode:159)]>1")) - response shouldBe a [Success[_]] + val size = 20000 + val response = q.search(QueryRequest("[dstore_1 (opcode:72)]<1", Some(size))) + response match { + case Failure(exception) => { + exception shouldBe a[SearchError] + } + case _ => { + fail("Limit exceeded should fail") + } + } } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala index db29ab8..1d9babc 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/Instance.scala @@ -15,72 +15,134 @@ // limitations under the License. package de.upb.cs.swt.delphi.instancemanagement + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} -trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { +/** + * Trait defining the implicit JSON formats needed to work with Instances + */ +trait InstanceJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with InstanceLinkJsonSupport { - implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] { + //Custom JSON format for an ComponentType + implicit val componentTypeFormat : JsonFormat[ComponentType] = new JsonFormat[ComponentType] { - def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString) + /** + * Custom write method for serializing an ComponentType + * @param compType The ComponentType to serialize + * @return JsString containing the serialized value + */ + def write(compType : ComponentType) = JsString(compType.toString) - def read(value: JsValue) : InstanceEnums.ComponentType = value match { + /** + * Custom read method for deserialization of an ComponentType + * @param value JsValue to deserialize (must be a JsString) + * @return ComponentType that has been read + * @throws DeserializationException Exception thrown when JsValue is in incorrect format + */ + def read(value: JsValue) : ComponentType = value match { case JsString(s) => s match { - case "Crawler" => InstanceEnums.ComponentType.Crawler - case "WebApi" => InstanceEnums.ComponentType.WebApi - case "WebApp" => InstanceEnums.ComponentType.WebApp - case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement - case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch + case "Crawler" => ComponentType.Crawler + case "WebApi" => ComponentType.WebApi + case "WebApp" => ComponentType.WebApp + case "DelphiManagement" => ComponentType.DelphiManagement + case "ElasticSearch" => ComponentType.ElasticSearch case x => throw DeserializationException(s"Unexpected string value $x for component type.") } - case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.") + case y => throw DeserializationException(s"Unexpected type $y during deserialization component type.") } } - implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] { + //Custom JSON format for an InstanceState + implicit val stateFormat : JsonFormat[InstanceState] = new JsonFormat[InstanceState] { - def write(compType : InstanceEnums.State) = JsString(compType.toString) + /** + * Custom write method for serializing an InstanceState + * @param state The InstanceState to serialize + * @return JsString containing the serialized value + */ + def write(state : InstanceState) = JsString(state.toString) - def read(value: JsValue) : InstanceEnums.State = value match { + /** + * Custom read method for deserialization of an InstanceState + * @param value JsValue to deserialize (must be a JsString) + * @return InstanceState that has been read + * @throws DeserializationException Exception thrown when JsValue is in incorrect format + */ + def read(value: JsValue) : InstanceState = value match { case JsString(s) => s match { - case "Running" => InstanceEnums.InstanceState.Running - case "Stopped" => InstanceEnums.InstanceState.Stopped - case "Failed" => InstanceEnums.InstanceState.Failed - case "Paused" => InstanceEnums.InstanceState.Paused - case "NotReachable" => InstanceEnums.InstanceState.NotReachable + case "Running" => InstanceState.Running + case "Stopped" => InstanceState.Stopped + case "Failed" => InstanceState.Failed + case "Paused" => InstanceState.Paused + case "NotReachable" => InstanceState.NotReachable + case "Deploying" => InstanceState.Deploying case x => throw DeserializationException(s"Unexpected string value $x for instance state.") } - case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.") + case y => throw DeserializationException(s"Unexpected type $y during deserialization instance state.") } } - implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance) + //JSON format for Instances + implicit val instanceFormat : JsonFormat[Instance] = jsonFormat10(Instance) } +/** + * The instance type used for transmitting data about an instance from an to the registry + * @param id Id of the instance. This is an Option[Long], as an registering instance will not yet have an id. + * @param host Host of the instance. + * @param portNumber Port the instance is reachable at. + * @param name Name of the instance + * @param componentType ComponentType of the instance. + * @param dockerId The docker container id of the instance. This is an Option[String], as not all instance have to be docker containers. + * @param instanceState State of the instance + */ final case class Instance ( id: Option[Long], host: String, portNumber: Long, name: String, - componentType: InstanceEnums.ComponentType, + componentType: ComponentType, dockerId: Option[String], - instanceState: InstanceEnums.State + instanceState: InstanceState, + labels: List[String], + linksTo: List[InstanceLink], + linksFrom: List[InstanceLink] ) + +/** + * Enumerations concerning instances + */ object InstanceEnums { + + //Type to use when working with component types type ComponentType = ComponentType.Value + + /** + * ComponentType enumeration defining the valid types of delphi components + */ object ComponentType extends Enumeration { val Crawler : Value = Value("Crawler") - val ElasticSearch : Value = Value("ElasticSearch") val WebApi : Value = Value("WebApi") val WebApp : Value = Value("WebApp") val DelphiManagement : Value = Value("DelphiManagement") + val ElasticSearch : Value = Value("ElasticSearch") } - type State = InstanceState.Value + + //Type to use when working with instance states + type InstanceState = InstanceState.Value + + /** + * InstanceState enumeration defining the valid states for instances of delphi components + */ object InstanceState extends Enumeration { + val Deploying : Value = Value("Deploying") val Running : Value = Value("Running") - val Paused : Value = Value("Paused") val Stopped : Value = Value("Stopped") val Failed : Value = Value("Failed") + val Paused : Value = Value("Paused") val NotReachable : Value = Value("NotReachable") } -} + +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala new file mode 100644 index 0000000..9377251 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceLink.scala @@ -0,0 +1,53 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.instancemanagement + +import LinkEnums.LinkState +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} + +trait InstanceLinkJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + + implicit val linkStateFormat: JsonFormat[LinkState] = new JsonFormat[LinkState] { + override def read(value: JsValue): LinkState = value match { + case JsString(s) => s match { + case "Assigned" => LinkState.Assigned + case "Outdated" => LinkState.Outdated + case "Failed" => LinkState.Failed + case x => throw DeserializationException(s"Unexpected string value $x for LinkState.") + } + case y => throw DeserializationException(s"Unexpected type $y during deserialization of LinkState") + } + + override def write(linkState: LinkState): JsValue = JsString(linkState.toString) + } + + implicit val instanceLinkFormat: JsonFormat[InstanceLink] = + jsonFormat3(InstanceLink) +} + + +final case class InstanceLink(idFrom: Long, idTo:Long, linkState: LinkState) + +object LinkEnums { + type LinkState = LinkState.Value + + object LinkState extends Enumeration { + val Assigned: Value = Value("Assigned") + val Failed: Value = Value("Failed") + val Outdated: Value = Value("Outdated") + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala index f01d42d..6a7517c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala @@ -18,31 +18,27 @@ package de.upb.cs.swt.delphi.instancemanagement import java.net.InetAddress -import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.ActorMaterializer import akka.util.ByteString import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} -import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server} +import de.upb.cs.swt.delphi.webapi.authorization.AuthProvider +import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, _} +import spray.json._ -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} -import spray.json._ -object InstanceRegistry extends JsonSupport with AppLogging -{ +object InstanceRegistry extends InstanceJsonSupport with AppLogging { - implicit val system : ActorSystem = Server.system - implicit val ec : ExecutionContext = system.dispatcher - implicit val materializer : ActorMaterializer = Server.materializer - lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption + lazy val instanceIdFromEnv: Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption - def handleInstanceStart(configuration: Configuration) : Option[Long] = { + def handleInstanceStart(configuration: Configuration): Option[Long] = { instanceIdFromEnv match { case Some(id) => reportStart(configuration) match { @@ -57,49 +53,52 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def handleInstanceStop(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceStop(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportStop(configuration) } else { deregister(configuration) } } - def handleInstanceFailure(configuration: Configuration) : Try[Unit] = { - if(instanceIdFromEnv.isDefined) { + def handleInstanceFailure(configuration: Configuration): Try[Unit] = { + if (instanceIdFromEnv.isDefined) { reportFailure(configuration) } else { deregister(configuration) } } - def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) + def reportStart(configuration: Configuration): Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start) - def reportStop(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry) { + def reportStop(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Stop) } else { Failure(new RuntimeException("Cannot report stop, no instance registry available.")) } } - def reportFailure(configuration: Configuration) : Try[Unit] = { - if(configuration.usingInstanceRegistry){ + def reportFailure(configuration: Configuration): Try[Unit] = { + if (configuration.usingInstanceRegistry) { executeReportOperation(configuration, ReportOperationType.Failure) } else { Failure(new RuntimeException("Cannot report failure, no instance registry available.")) } } - private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = { + private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value): Try[Unit] = { instanceIdFromEnv match { case Some(id) => val request = HttpRequest( method = HttpMethods.POST, configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id)) - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + val useGenericNameForToken = operationType == ReportOperationType.Start //Must use generic name for startup, no id known at that point + + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization", + s"Bearer ${AuthProvider.generateJwt(useGenericName = useGenericNameForToken)}"))) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully reported ${operationType.toString} to Instance Registry.") Success() } @@ -108,7 +107,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")) }, Duration.Inf) @@ -117,11 +116,12 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")) } } - def register(configuration: Configuration) :Try[Long] = { - val instance = createInstance(None,configuration.bindPort, configuration.instanceName) - Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => - if(response.status == StatusCodes.OK){ + def register(configuration: Configuration): Try[Long] = { + val instance = createInstance(None, configuration.bindPort, configuration.instanceName) + + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/instances/register") map { response => + if (response.status == StatusCodes.OK) { Await.result(Unmarshal(response.entity).to[String] map { assignedID => val id = assignedID.toLong log.info(s"Successfully registered at Instance Registry, got ID $id.") @@ -137,23 +137,25 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to register at Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } - def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = { - if(!configuration.usingInstanceRegistry) { + def retrieveElasticSearchInstance(configuration: Configuration): Try[Instance] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) } else { - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") + val request = HttpRequest(method = HttpMethods.GET, + configuration.instanceRegistryUri + + s"/instances/${configuration.assignedID.getOrElse(-1)}/matchingInstance?ComponentType=ElasticSearch") - Await.result(Http(system).singleRequest(request) map {response => + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => response.status match { case StatusCodes.OK => try { - val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) + val instanceString: String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds) val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat) val elasticIP = esInstance.host log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") @@ -178,32 +180,36 @@ object InstanceRegistry extends JsonSupport with AppLogging } } - def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { + def sendMatchingResult(isElasticSearchReachable: Boolean, configuration: Configuration): Try[Unit] = { - if(!configuration.usingInstanceRegistry) { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) } else { - if(configuration.elasticsearchInstance.id.isEmpty) { + if (configuration.elasticsearchInstance.id.isEmpty) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) + + val matchingData = JsObject("MatchingSuccessful" -> JsBoolean(isElasticSearchReachable), + "SenderId" -> JsNumber(configuration.assignedID.getOrElse(-1L))) + val request = HttpRequest( method = HttpMethods.POST, - configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable") + configuration.instanceRegistryUri + s"/instances/$idToPost/matchingResult") - Await.result(Http(system).singleRequest(request) map {response => - val status=response.status - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request + .withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}")) + .withEntity(ContentTypes.`application/json`, ByteString(matchingData.toJson.toString))) map { response => + if (response.status == StatusCodes.OK) { log.info(s"Successfully posted matching result to Instance Registry.") Success() } else { - val statuscode = response.status - log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode") - Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode")) + log.warning(s"Failed to post matching result to Instance Registry, server returned ${response.status}") + Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned ${response.status}")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to post matching result to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) }, Duration.Inf) @@ -212,16 +218,17 @@ object InstanceRegistry extends JsonSupport with AppLogging } - def deregister(configuration: Configuration) : Try[Unit] = { - if(!configuration.usingInstanceRegistry){ + def deregister(configuration: Configuration): Try[Unit] = { + if (!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) } else { - val id : Long = configuration.assignedID.getOrElse(-1L) + val id: Long = configuration.assignedID.getOrElse(-1L) - val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") + val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + + s"/instances/$id/deregister") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ + Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt()}"))) map { response => + if (response.status == StatusCodes.OK) { log.info("Successfully deregistered from Instance Registry.") Success() } @@ -231,17 +238,20 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode")) } - } recover {case ex => + } recover { case ex => log.warning(s"Failed to deregister to Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) } } - def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = { + def postInstance(instance: Instance, uri: String)(): Future[HttpResponse] = { try { - val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString()) - Http(system).singleRequest(request) + val request = HttpRequest(method = HttpMethods.POST, uri = uri) + //Use generic name for startup, no id present at this point + Http(system).singleRequest(request + .withHeaders(RawHeader("Authorization",s"Bearer ${AuthProvider.generateJwt(useGenericName = true)}")) + .withEntity(ContentTypes.`application/json`, ByteString(instance.toJson(instanceFormat).toString))) } catch { case dx: DeserializationException => log.warning(s"Failed to deregister to Instance Registry, exception: $dx") @@ -250,77 +260,26 @@ object InstanceRegistry extends JsonSupport with AppLogging } - private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = + private def createInstance(id: Option[Long], controlPort: Int, name: String): Instance = Instance(id, InetAddress.getLocalHost.getHostAddress, - controlPort, name, ComponentType.WebApi, None, InstanceState.Running) - - def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={ - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportStart, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode")) - } - } recover {case ex => - log.warning(s"Failed to perform reportStart, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex")) - }, Duration.Inf) - } + controlPort, name, ComponentType.WebApi, None, InstanceState.Running, List.empty[String], List.empty[InstanceLink], List.empty[InstanceLink]) - def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = { - - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportFailure, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode")) - } - } recover {case ex => - log.warning(s"Failed to perform reportFailure, server returned, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex")) - }, Duration.Inf) - } - - def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = { - - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop") - Await.result(Http(system).singleRequest(request) map {response => - if(response.status == StatusCodes.OK){ - Success(response.entity) - } - else { - val statuscode = response.status - log.warning(s"Failed to perform reportStop, server returned $statuscode") - Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode")) - } - } recover {case ex => - log.warning(s"Failed to perform reportStop, server returned, exception: $ex") - Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex")) - }, Duration.Inf) - } object ReportOperationType extends Enumeration { - val Start : Value = Value("Start") - val Stop : Value = Value("Stop") - val Failure : Value = Value("Failure") + val Start: Value = Value("Start") + val Stop: Value = Value("Stop") + val Failure: Value = Value("Failure") - def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = { + def toOperationUriString(operation: ReportOperationType.Value, id: Long): String = { operation match { case Start => - s"/reportStart?Id=$id" + s"/instances/$id/reportStart" case Stop => - s"/reportStop?Id=$id" + s"/instances/$id/reportStop" case _ => - s"/reportFailure?Id=$id" + s"/instances/$id/reportFailure" } } } + } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index 3874191..9573eae 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -19,7 +19,7 @@ package de.upb.cs.swt.delphi.webapi import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.{ElasticsearchClientUri, Index, IndexAndType} import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState} -import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceRegistry} +import de.upb.cs.swt.delphi.instancemanagement.{Instance, InstanceLink, InstanceRegistry} import scala.util.{Failure, Success, Try} @@ -49,7 +49,10 @@ class Configuration( //Server and Elasticsearch configuration "Default ElasticSearch instance", ComponentType.ElasticSearch, None, - InstanceState.Running) + InstanceState.Running, + List.empty[String], + List.empty[InstanceLink], + List.empty[InstanceLink]) } val defaultElasticSearchPort: Int = 9200 val defaultElasticSearchHost: String = "elasticsearch://localhost" @@ -85,6 +88,8 @@ class Configuration( //Server and Elasticsearch configuration } lazy val instanceId: Option[Long] = InstanceRegistry.handleInstanceStart(configuration = this) + val jwtSecretKey: String = sys.env.getOrElse("DELPHI_JWT_SECRET","changeme") + } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala new file mode 100644 index 0000000..4fcfd8b --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/DelphiRoutes.scala @@ -0,0 +1,168 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.NotUsed +import akka.actor.ActorRef +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.pattern.ask +import akka.stream.scaladsl.Source +import de.upb.cs.swt.delphi.webapi.IpLogActor._ +import de.upb.cs.swt.delphi.webapi.StatisticsJson._ +import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ +import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ +import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchError, SearchQuery} +import spray.json._ + +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + + +class DelphiRoutes(requestLimiter: RequestLimitScheduler) extends JsonSupport with AppLogging { + + def routes: Route = { + requestLimiter.acceptOnValidLimit { + apiRoutes + } + } + + def apiRoutes: Route = { + path("version") { + version + } ~ + path("features") { + features + } ~ + path("statistics") { + statistics + } ~ + pathPrefix("search") { + search + } ~ + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) + } + } + + private def version = { + get { + complete { + BuildInfo.version + } + } + } + + private val featureExtractor = new FeatureQuery(configuration) + + private def features = { + get { + parameter('pretty.?) { (pretty) => + complete( + //TODO: Introduce failure concept for feature extractor + prettyPrint(pretty, featureExtractor.featureList.toJson) + ) + } + } + } + + + private def statistics = { + get { + parameter('pretty.?) { (pretty) => + complete { + val result = new StatisticsQuery(configuration).retrieveStandardStatistics + result match { + case Some(stats) => { + prettyPrint(pretty, stats.toJson) + } + case _ => HttpResponse(StatusCodes.InternalServerError) + } + } + } + } + } + + private def retrieve(identifier: String): Route = { + get { + parameter('pretty.?) { (pretty) => + complete( + RetrieveQuery.retrieve(identifier) match { + case Some(result) => prettyPrint(pretty, result.toJson) + case None => HttpResponse(StatusCodes.NotFound) + } + ) + } + } + } + + def search: Route = { + post { + parameter('pretty.?) { (pretty) => + entity(as[QueryRequest]) { input => + log.info(s"Received search query: ${input.query}") + complete( + new SearchQuery(configuration, featureExtractor).search(input) match { + case Success(result) => prettyPrint(pretty, result.toJson) + case Failure(e) => { + e match { + case se: SearchError => { + se.toJson + } + case _ => { + new SearchError("Search query failed").toJson + } + } + } + } + ) + } + } + } + } +} + +object DelphiRoutes { + + private val ipLogActor = system.actorOf(IpLogActor.props) + private val requestLimiter = new RequestLimitScheduler(ipLogActor) + private val routes = new DelphiRoutes(requestLimiter).routes + + def apply(): Route = routes +} + +private final class RequestLimitScheduler(ipLogActor: ActorRef) extends JsonSupport { + Source.tick(0.second, refreshRate, NotUsed) + .runForeach(_ => { + ipLogActor ! Reset + })(materializer) + + def acceptOnValidLimit(apiRoutes: Route): Route = { + extractClientIP { ip => + val promise = (ipLogActor ? Accepted(ip.toString())).mapTo[Boolean] + onSuccess(promise) { + success => + if (success) { + apiRoutes + } else { + val res = Map("msg" -> "Request limit exceeded") + complete(res.toJson) + } + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala new file mode 100644 index 0000000..c411953 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/IpLogActor.scala @@ -0,0 +1,63 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, Props} +import de.upb.cs.swt.delphi.webapi.IpLogActor._ + +import scala.collection.mutable.ListBuffer + +/** + * @author Hariharan. + */ + + +class IpLogActor extends Actor with AppLogging { + private var ipStore = ListBuffer[String]() + + + override def receive: Receive = { + case Add(ip) => { + ipStore += ip + } + case Reset => { + ipStore = ListBuffer[String]() + } + case Accepted(ip) => { + val hostname = ip.split(":")(0) + self ! Add(hostname) + val noOfReqFromIp = ipStore.count(_.equals(hostname)) + val validTotalReq = ipStore.size < maxTotalNoRequest + val validIndividualReq = noOfReqFromIp < maxIndividualReq + val validReq = validTotalReq && validIndividualReq + val accept = if (validReq) true else false + sender() ! accept + } + } +} + + +object IpLogActor { + + final case class Add(ip: String) + + case object Reset + + final case class Accepted(ip: String) + + def props: Props = Props(new IpLogActor) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index d59dd29..2f2684b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -16,128 +16,28 @@ package de.upb.cs.swt.delphi.webapi -import java.util.concurrent.TimeUnit - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.{HttpResponse, StatusCodes} import akka.http.scaladsl.server.{HttpApp, Route} -import akka.stream.ActorMaterializer -import akka.util.Timeout import de.upb.cs.swt.delphi.instancemanagement.InstanceRegistry -import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactJson._ -import de.upb.cs.swt.delphi.webapi.search.QueryRequestJson._ -import de.upb.cs.swt.delphi.webapi.search.{QueryRequest, SearchQuery} -import spray.json._ - -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success} /** * Web server configuration for Delphi web API. */ object Server extends HttpApp with JsonSupport with AppLogging { + val delphiRoutes = DelphiRoutes() - implicit val system = ActorSystem("delphi-webapi") - implicit val materializer = ActorMaterializer() - - private implicit val configuration = new Configuration() - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - - override def routes: Route = - path("version") { - version - } ~ - path("features") { - features - } ~ - path("statistics") { - statistics - } ~ - pathPrefix("search" ) { search } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } - - - private def version = { - get { - complete { - BuildInfo.version - } - } - } - - private val featureExtractor = new FeatureQuery(configuration) - - private def features = { - get { - parameter('pretty.?) { (pretty) => - complete( - //TODO: Introduce failure concept for feature extractor - prettyPrint(pretty, featureExtractor.featureList.toJson) - ) - } - } - } - - private def statistics = { - get { - parameter('pretty.?) { (pretty) => - complete { - val result = new StatisticsQuery(configuration).retrieveStandardStatistics - result match { - case Some(stats) => { - import StatisticsJson._ - prettyPrint(pretty, stats.toJson) - } - case _ => HttpResponse(StatusCodes.InternalServerError) - } - } - } - } + override def routes: Route = { + delphiRoutes } - private def retrieve(identifier: String): Route = { - get { - parameter('pretty.?) { (pretty) => - complete( - RetrieveQuery.retrieve(identifier) match { - case Some(result) => prettyPrint(pretty, result.toJson) - case None => HttpResponse(StatusCodes.NotFound) - } - ) - } - } - } - - def search: Route = { - post { - parameter('pretty.?) { (pretty) => - entity(as[QueryRequest]) { input => - log.info(s"Received search query: ${input.query}") - complete( - new SearchQuery(configuration, featureExtractor).search(input) match { - case Success(result) => prettyPrint(pretty, result.toJson) - case Failure(e) => e.getMessage - } - ) - } - } - } - } def main(args: Array[String]): Unit = { - sys.addShutdownHook({ - log.warning("Received shutdown signal.") - InstanceRegistry.handleInstanceStop(configuration) - }) StartupCheck.check(configuration) Server.startServer(configuration.bindHost, configuration.bindPort, system) - implicit val ec: ExecutionContext = system.dispatcher - val terminationFuture = system.terminate() + InstanceRegistry.handleInstanceStop(configuration) - terminationFuture.onComplete { + system.terminate().onComplete{ sys.exit(0) } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala new file mode 100644 index 0000000..f03ad12 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/authorization/AuthProvider.scala @@ -0,0 +1,35 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.webapi.authorization + +import de.upb.cs.swt.delphi.webapi +import pdi.jwt.{Jwt, JwtAlgorithm, JwtClaim} + +object AuthProvider { + + def generateJwt(validFor: Long = 1, useGenericName: Boolean = false): String = { + val claim = JwtClaim() + .issuedNow + .expiresIn(validFor * 60) + .startsNow + . + ("user_id", if (useGenericName) webapi.configuration.instanceName else s"${webapi.configuration.assignedID.get}") + . + ("user_type", "Component") + + + Jwt.encode(claim, webapi.configuration.jwtSecretKey, JwtAlgorithm.HS256) + } + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala new file mode 100644 index 0000000..907f44d --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/package.scala @@ -0,0 +1,52 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.util.Timeout + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + + +package object webapi { + + implicit val system: ActorSystem = ActorSystem("delphi-webapi") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val ec: ExecutionContext = system.dispatcher + + implicit val configuration: Configuration = new Configuration() + + val defaultTimeout = 5 + implicit val timeout: Timeout = Timeout(defaultTimeout, TimeUnit.SECONDS) + + /** + * Maximum no of requests allowed until `refreshRate` is triggered. + */ + val maxTotalNoRequest = 2000 + /** + * Maximum no of requests allowed for an individual until `refreshRate` is triggered. + */ + val maxIndividualReq = 200 + /** + * Used by `Source.tick` to refresh ip log periodically + */ + val refreshRate: FiniteDuration = 5.minutes +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala index 3479577..ffd57df 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequest.scala @@ -1,3 +1,18 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.webapi.search -case class QueryRequest (query : String, limit : Option[Int] = Some(50)) +case class QueryRequest (query : String, limit : Option[Int] = Some(defaultFetchSize)) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala index 6ae239e..09699a6 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/QueryRequestJson.scala @@ -1,3 +1,18 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package de.upb.cs.swt.delphi.webapi.search import spray.json.DefaultJsonProtocol diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala index 80c11b1..3caa421 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/SearchQuery.scala @@ -18,18 +18,18 @@ package de.upb.cs.swt.delphi.webapi.search import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.search.SearchHits -import com.sksamuel.elastic4s.http.{ElasticClient, RequestSuccess} +import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.searches.queries.{NoopQuery, Query} -import de.upb.cs.swt.delphi.webapi.{Configuration, FeatureQuery} -import de.upb.cs.swt.delphi.webapi.artifacts.ArtifactTransformer +import de.upb.cs.swt.delphi.webapi.artifacts.{Artifact, ArtifactTransformer} import de.upb.cs.swt.delphi.webapi.search.querylanguage._ +import de.upb.cs.swt.delphi.webapi.{Configuration, FeatureQuery} import scala.util.{Failure, Success, Try} class SearchQuery(configuration: Configuration, featureExtractor: FeatureQuery) { private val client = ElasticClient(configuration.elasticsearchClientUri) - private def checkAndExecuteParsedQuery(ast: CombinatorialExpr, limit : Int): Try[SearchHits] = { + private def checkAndExecuteParsedQuery(ast: CombinatorialExpr, limit: Int): Try[SearchHits] = { val fields = collectFieldNames(ast) if (fields.diff(featureExtractor.featureList.toSeq).size > 0) return Failure(new IllegalArgumentException("Unknown field name used.")) @@ -109,16 +109,42 @@ class SearchQuery(configuration: Configuration, featureExtractor: FeatureQuery) } } - def search(query: QueryRequest) = { - val parserResult = new Syntax(query.query).QueryRule.run() - parserResult match { - case Failure(e) => Failure(e) - case Success(ast) => { - checkAndExecuteParsedQuery(ast, query.limit.getOrElse(50)) match { - case Failure(e) => Failure(e) - case Success(hits) => Success(ArtifactTransformer.transformResults(hits)) + def checkValidSize: Option[Int] = { + import elastic4s.extns._ + import elastic4s.extns.ElasticDslExtn._ + val params = Map("include_defaults" -> true) + val query = SettingsRequest("delphi", params) + val res = client.execute { + query + }.await + res match { + case RequestSuccess(_, b, _, _) => { + maxResultSize(b, configuration) + } + case RequestFailure(_, _, _, _) => { + None + } + } + } + + def search(query: QueryRequest): Try[Array[Artifact]] = { + lazy val size = checkValidSize + val validSize = size.exists(query.limit.getOrElse(defaultFetchSize) <= _) + if (validSize) { + val parserResult = new Syntax(query.query).QueryRule.run() + parserResult match { + case Failure(e) => Failure(e) + case Success(ast) => { + checkAndExecuteParsedQuery(ast, query.limit.getOrElse(defaultFetchSize)) match { + case Failure(e) => Failure(e) + case Success(hits) => Success(ArtifactTransformer.transformResults(hits)) + } } } } + else { + val errorMsg = new SearchError(s"Query limit exceeded default limit: ${query.limit}>${size}") + Failure(errorMsg) + } } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala new file mode 100644 index 0000000..fe5332a --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/elastic4s/extns/package.scala @@ -0,0 +1,66 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package de.upb.cs.swt.delphi.webapi.search.elastic4s + +import com.sksamuel.elastic4s.http.settings.IndexSettingsResponse +import com.sksamuel.elastic4s.http.{ElasticRequest, Handler} +import com.sksamuel.elastic4s.json.JacksonSupport +import de.upb.cs.swt.delphi.webapi.Configuration + +package object extns { + + case class SettingsRequest(index: String, params: Map[String, Any]) + + trait RichSettingsHandler { + + implicit object RichGetSettings extends Handler[SettingsRequest, IndexSettingsResponse] { + + override def build(request: SettingsRequest): ElasticRequest = { + val endpoint = "/" + request.index + "/_settings" + val req = ElasticRequest("GET", endpoint, request.params) + req + } + } + + } + + def maxResultSize(res: Option[String], config: Configuration): Option[Int] = { + res match { + case Some(j) => { + val custom = s"/${config.esIndex}/settings/index" + val default = s"/${config.esIndex}/defaults/index" + val target = "max_result_window" + val node = JacksonSupport.mapper.readTree(j) + val size = if (node.at(custom).has(target)) { + Some(node.at(custom + "/" + target).asInt()) + } + else { + Some(node.at(default + "/" + target).asInt()) + } + size + } + case None => { + None + } + } + } + + + trait ElasticDslExtn extends RichSettingsHandler + + object ElasticDslExtn extends ElasticDslExtn + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala new file mode 100644 index 0000000..e0b8189 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/search/package.scala @@ -0,0 +1,33 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.webapi + +import spray.json._ + +package object search { + + val defaultFetchSize = 50 + + class SearchError(msg: String) extends RuntimeException(msg) with JsonSupport + + implicit val searchErrorWriter = new JsonWriter[SearchError] { + override def write(obj: SearchError): JsValue = { + JsObject("msg" -> JsString(obj.getMessage)) + } + } + +}