diff --git a/build.sbt b/build.sbt index f16cc42..af4a00c 100644 --- a/build.sbt +++ b/build.sbt @@ -49,9 +49,12 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8", "com.typesafe.akka" %% "akka-http" % "10.1.5" ) +libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.5.3" + libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime val elastic4sVersion = "6.3.0" diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..f993f76 --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +io.swagger.client { + + apiRequest { + + compression { + enabled: false + size-threshold: 0 + } + + trust-certificates: true + + connection-timeout: 5000ms + + default-headers { + "userAgent": "swagger-client_1.0.0" + } + + // let you define custom http status code, as in : + // { code: 601, reason: "some custom http status code", success: false } + custom-codes : [] + } +} + +spray.can.host-connector.max-redirects = 10 \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index e58dbc0..978dc9c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -20,13 +20,56 @@ import java.net.URI import akka.stream.ThrottleMode import com.sksamuel.elastic4s.ElasticsearchClientUri +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + +class Configuration { + + lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( + elasticsearchInstance.host + ":" + elasticsearchInstance.portNumber) + + lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { + case Success(instance) => instance + case Failure(_) => Instance( + None, + fallbackElasticSearchHost, + fallbackElasticSearchPort, + "Default ElasticSearch instance", + ComponentType.ElasticSearch) + } -class Configuration { - val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")) val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 + + val defaultElasticSearchPort : Int = 9200 + val defaultElasticSearchHost : String = "elasticsearch://localhost" + + lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match { + case Some(hostString) => if(hostString.count(c => c == ':') == 2){ + Try(hostString.split(":")(2).toInt) match { + case Success(port) => port + case Failure(_) => defaultElasticSearchPort + } + } else { + defaultElasticSearchPort + } + case None => defaultElasticSearchPort + } + + lazy val fallbackElasticSearchHost : String = sys.env.get("DELPHI_ELASTIC_URI") match { + case Some(hostString) => + if(hostString.count(c => c == ':') == 2){ + hostString.substring(0,hostString.lastIndexOf(":")) + } else { + defaultElasticSearchHost + } + case None => defaultElasticSearchHost + + } val limit : Int = 50 val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping) @@ -35,6 +78,19 @@ class Configuration { val elasticActorPoolSize : Int = 4 val callGraphStreamPoolSize : Int = 4 + val instanceName = "MyCrawlerInstance" + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8087") + + lazy val usingInstanceRegistry : Boolean = assignedID match { + case Some(_) => true + case None => false + } + + lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match { + case Success(id) => Some(id) + case Failure(_) => None + } + case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala index a995f53..322fe0b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala @@ -22,6 +22,7 @@ import com.sksamuel.elastic4s.http.ElasticClient import de.upb.cs.swt.delphi.crawler.control.Server import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor.Start +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.preprocessing.PreprocessingDispatchActor import de.upb.cs.swt.delphi.crawler.processing.{HermesActor, HermesAnalyzer, ProcessingDispatchActor} import de.upb.cs.swt.delphi.crawler.storage.ElasticActor @@ -42,10 +43,11 @@ object Crawler extends App with AppLogging { implicit val materializer = ActorMaterializer() OPALLogger.updateLogger(GlobalLogContext, OPALLogAdapter) - HermesAnalyzer.setConfig() + //HermesAnalyzer.setConfig() - sys.addShutdownHook(() => { + sys.addShutdownHook({ log.warning("Received shutdown signal.") + InstanceRegistry.deregister(configuration) val future = system.terminate() Await.result(future, 120.seconds) }) @@ -55,6 +57,7 @@ object Crawler extends App with AppLogging { Startup.preflightCheck(configuration) match { case Success(c) => case Failure(e) => { + InstanceRegistry.deregister(configuration) system.terminate() sys.exit(1) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala new file mode 100644 index 0000000..1d532af --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -0,0 +1,168 @@ +// Copyright (C) 2018 The Delphi Team. +// See the LICENCE file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package de.upb.cs.swt.delphi.crawler.instancemanagement + +import java.net.InetAddress + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler} +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, JsonSupport} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +object InstanceRegistry extends JsonSupport with AppLogging +{ + + implicit val system : ActorSystem = Crawler.system + implicit val ec : ExecutionContext = system.dispatcher + implicit val materializer : ActorMaterializer = Crawler.materializer + + + def register(configuration: Configuration) : Try[Long] = { + val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) + + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => + if(response.status == StatusCodes.OK){ + Await.result(Unmarshal(response.entity).to[String] map { assignedID => + val id = assignedID.toLong + log.info(s"Successfully registered at Instance Registry, got ID $id.") + Success(id) + } recover { case ex => + log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + } + else { + val statuscode = response.status + log.warning(s"Failed to register at Instance Registry, server returned $statuscode") + Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode")) + } + + } recover {case ex => + log.warning(s"Failed to register at Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + } + + def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = { + if(!configuration.usingInstanceRegistry) { + Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) + } else { + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") + + Await.result(Http(system).singleRequest(request) map {response => + response.status match { + case StatusCodes.OK => + Await.result(Unmarshal(response.entity).to[Instance] map {instance => + val elasticIP = instance.host + log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") + Success(instance) + } recover {case ex => + log.warning(s"Failed to read response from Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + case StatusCodes.NotFound => + log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.") + Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}")) + case _ => + val status = response.status + log.warning(s"Failed to read matching instance from Instance Registry, server returned $status") + Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status")) + } + } recover { case ex => + log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ") + Failure(ex) + }, Duration.Inf) + } + } + + def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { + if(!configuration.usingInstanceRegistry) { + Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) + } else { + if(configuration.elasticsearchInstance.id.isEmpty) { + Failure(new RuntimeException("The ElasticSearch instance was not assigned by the Instance Registry, so no matching result will be posted.")) + } else { + val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) + val request = HttpRequest( + method = HttpMethods.POST, + configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable") + + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + log.info("Successfully posted matching result to Instance Registry.") + Success() + } + else { + val statuscode = response.status + log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode") + Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode")) + } + + } recover {case ex => + log.warning(s"Failed to post matching result to Instance Registry, exception: $ex") + Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) + }, Duration.Inf) + } + } + + } + + def deregister(configuration: Configuration) : Try[Unit] = { + if(!configuration.usingInstanceRegistry){ + Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) + } else { + val id : Long = configuration.assignedID.getOrElse(-1L) + + val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") + + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + log.info("Successfully deregistered from Instance Registry.") + Success() + } + else { + val statuscode = response.status + log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode") + Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode")) + } + + } recover {case ex => + log.warning(s"Failed to deregister to Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + } + } + + def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = + Marshal(instance).to[RequestEntity] flatMap { entity => + val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) + Http(system).singleRequest(request) + } + + + private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = + Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.Crawler) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala new file mode 100644 index 0000000..252e4d2 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -0,0 +1,52 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ + +package de.upb.cs.swt.delphi.crawler.io.swagger.client.model + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json._ + +trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] { + def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString) + + def read(value: JsValue) = value match { + case JsString(s) => s match { + case "Crawler" => InstanceEnums.ComponentType.Crawler + case "WebApi" => InstanceEnums.ComponentType.WebApi + case "WebApp" => InstanceEnums.ComponentType.WebApp + case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement + case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch + case x => throw new RuntimeException(s"Unexpected string value $x for component type.") + } + case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.") + } + } + implicit val instanceFormat = jsonFormat5(Instance) +} + +final case class Instance ( + id: Option[Long], + host: String, + portNumber: Int, + name: String, + /* Component Type */ + componentType: InstanceEnums.ComponentType +) + +object InstanceEnums { + + type ComponentType = ComponentType.Value + object ComponentType extends Enumeration { + val Crawler = Value("Crawler") + val WebApi = Value("WebApi") + val WebApp = Value("WebApp") + val DelphiManagement = Value("DelphiManagement") + val ElasticSearch = Value("ElasticSearch") + } + +} + diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala index 5d168aa..2c8a841 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala @@ -20,6 +20,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} import scala.concurrent.duration.Duration @@ -46,7 +47,10 @@ object ElasticIndexPreflightCheck extends PreflightCheck with ElasticIndexMainte case false => migrateIndex(configuration) // This needs some work } } - case RequestFailure(_, _, _, e) => Failure(new ElasticException(e)) + case RequestFailure(_, _, _, e) => { + InstanceRegistry.sendMatchingResult(false, configuration) + Failure(new ElasticException(e)) + } } } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala index 7609de9..84db188 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala @@ -17,8 +17,9 @@ package de.upb.cs.swt.delphi.crawler.storage import akka.actor.ActorSystem -import com.sksamuel.elastic4s.http.ElasticClient import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.ElasticClient +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} import scala.concurrent.duration._ @@ -44,8 +45,14 @@ object ElasticReachablePreflightCheck extends PreflightCheck { val f = (client.execute { nodeInfo() - } map { i => Success(configuration) - } recover { case e => Failure(e) + } map { i => { + InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration) + Success(configuration) + } + } recover { case e => + InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration) + Failure(e) + }).andThen { case _ => client.close() }