From 6e6a675232d255c6d7c38b0f04e1a143d47f1191 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 1 Aug 2018 16:52:51 +0200 Subject: [PATCH 01/20] Implemented a Preflight check connecting to the Instance Registry Initial functionality to verify the instance registry is available on startup --- .../cs/swt/delphi/crawler/Configuration.scala | 1 + .../upb/cs/swt/delphi/crawler/Startup.scala | 3 +- .../IRReachablePreflightCheck.scala | 28 ++++++++ .../crawler/tools/BlockingHttpClient.scala | 65 +++++++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 080a628..310f7b2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -13,6 +13,7 @@ class Configuration { val controlServerPort : Int = 8882 val throttle : Throttle = Throttle(10, 10 millis, 10, ThrottleMode.shaping) val limit : Int = 50 + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9300") case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala index 445a300..b4afbba 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala @@ -1,6 +1,7 @@ package de.upb.cs.swt.delphi.crawler import akka.actor.ActorSystem +import de.upb.cs.swt.delphi.crawler.instancemanagement.IRReachablePreflightCheck import de.upb.cs.swt.delphi.crawler.storage.{ElasticIndexPreflightCheck, ElasticReachablePreflightCheck} import scala.util.{Failure, Success, Try} @@ -14,7 +15,7 @@ object Startup extends AppLogging { def preflightCheck(configuration: Configuration)(implicit system : ActorSystem) : Try[Configuration] = { log.info("Performing pre-flight checks") - val checks = Seq(ElasticReachablePreflightCheck, ElasticIndexPreflightCheck) + val checks = Seq(IRReachablePreflightCheck, ElasticReachablePreflightCheck, ElasticIndexPreflightCheck) checks.foreach(p => { val result = p.check(configuration) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala new file mode 100644 index 0000000..997f6eb --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala @@ -0,0 +1,28 @@ +package de.upb.cs.swt.delphi.crawler.instancemanagement + +import java.net.InetAddress + +import akka.actor.ActorSystem +import scala.util.{Failure, Success} +import de.upb.cs.swt.delphi.crawler.tools.BlockingHttpClient +import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} +import play.api.libs.json.Json + +import scala.util.Try + +object IRReachablePreflightCheck extends PreflightCheck{ + override def check(configuration: Configuration)(implicit system: ActorSystem): Try[Configuration] = { + val fieldList = List( + ("Type","Crawler"), + ("IP",InetAddress.getLocalHost().getHostName()), + ("port",configuration.controlServerPort.toString()) + ) + val json = Json.toJson(fieldList) + + BlockingHttpClient.executePost("/register", json.toString(), response => { + Success(configuration) + })(configuration) + + Failure(new IllegalArgumentException(s"Cannot reach instance registry at ${configuration.instanceRegistryUri}")) + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala new file mode 100644 index 0000000..950fb21 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala @@ -0,0 +1,65 @@ +package de.upb.cs.swt.delphi.crawler.tools + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model._ +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import akka.util.ByteString +import de.upb.cs.swt.delphi.crawler.Configuration + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} + +/*** + * A blocking http client implemented using Akka HTTP + */ +object BlockingHttpClient { + + def doRequest(uri : Uri, method : HttpMethod, jsonData : String) : Try[String] = { + implicit val system = ActorSystem() + implicit val executionContext = system.dispatcher + implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system)) + val httpActor = Http(system) + + try { + println("Creating request...") + val request = HttpRequest(method = method, uri = uri, entity = ByteString(jsonData)) + println("Executing request...") + val req: Future[HttpResponse] = httpActor.singleRequest(request) + println("Awatiting response...") + Await.result(req, Duration.Inf) + + println("Parsing response...") + + val f = req.value.get.get.entity.dataBytes.runFold(ByteString(""))(_ ++ _) + Await.result(f, Duration.Inf) + + Success(f.value.get.get.utf8String) + } catch { + case e : Exception => { + httpActor.shutdownAllConnectionPools() + system.terminate() + Failure(e) + } + } + + } + + def executeRequest(target: String, method : HttpMethod, jsonData: String, onSuccess: String => Unit)(config: Configuration) : Unit = { + + val uri = Uri(config.instanceRegistryUri) + println(s"Contacting server ${config.instanceRegistryUri}...") + val resp = BlockingHttpClient.doRequest(uri.withPath(uri.path + target), method, jsonData) + + resp match { + case Success(res) => onSuccess(res) + case Failure(_) => println(s"Could not reach server ${config.instanceRegistryUri}.") + } + } + + def executeGet(target: String, jsonData: String, onSuccess: String => Unit)(config: Configuration): Unit = executeRequest(target, HttpMethods.GET, jsonData, onSuccess)(config) + def executePost(target: String, jsonData: String, onSuccess: String => Unit)(config: Configuration): Unit = executeRequest(target, HttpMethods.POST, jsonData, onSuccess)(config) + + +} From 7edf235aa210ccea0d940d8272c0b585605f7aa2 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 7 Aug 2018 15:49:04 +0200 Subject: [PATCH 02/20] ElasticSearch instance can now be set to the one responded by th IR Changed infrastructure of Configuration to support the IR concept. --- .../cs/swt/delphi/crawler/Configuration.scala | 19 +++++- .../upb/cs/swt/delphi/crawler/Startup.scala | 3 +- .../IRReachablePreflightCheck.scala | 28 -------- .../instancemanagement/InstanceRegistry.scala | 25 +++++++ .../ElasticReachablePreflightCheck.scala | 11 +++- .../crawler/tools/BlockingHttpClient.scala | 65 ------------------- 6 files changed, 52 insertions(+), 99 deletions(-) delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 310f7b2..500dd68 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -4,17 +4,32 @@ import java.net.URI import akka.stream.ThrottleMode import com.sksamuel.elastic4s.ElasticsearchClientUri +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +class Configuration { + + lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(InstanceRegistry.retrieveElasticSearchInstance(this) match { + case Success(elasticIP) => elasticIP + case Failure(_) => sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200") + }) -class Configuration { - val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")) val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 val throttle : Throttle = Throttle(10, 10 millis, 10, ThrottleMode.shaping) val limit : Int = 50 val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9300") + lazy val usingInstanceRegistry = InstanceRegistry.register("MyCrawlerInstance",this) match { + case Success(_) => true + case Failure(_) => { + println(s"Failed to connect to Instance Registry at ${instanceRegistryUri}. Using default configuration instead.") + false + } + } + case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala index b4afbba..445a300 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Startup.scala @@ -1,7 +1,6 @@ package de.upb.cs.swt.delphi.crawler import akka.actor.ActorSystem -import de.upb.cs.swt.delphi.crawler.instancemanagement.IRReachablePreflightCheck import de.upb.cs.swt.delphi.crawler.storage.{ElasticIndexPreflightCheck, ElasticReachablePreflightCheck} import scala.util.{Failure, Success, Try} @@ -15,7 +14,7 @@ object Startup extends AppLogging { def preflightCheck(configuration: Configuration)(implicit system : ActorSystem) : Try[Configuration] = { log.info("Performing pre-flight checks") - val checks = Seq(IRReachablePreflightCheck, ElasticReachablePreflightCheck, ElasticIndexPreflightCheck) + val checks = Seq(ElasticReachablePreflightCheck, ElasticIndexPreflightCheck) checks.foreach(p => { val result = p.check(configuration) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala deleted file mode 100644 index 997f6eb..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/IRReachablePreflightCheck.scala +++ /dev/null @@ -1,28 +0,0 @@ -package de.upb.cs.swt.delphi.crawler.instancemanagement - -import java.net.InetAddress - -import akka.actor.ActorSystem -import scala.util.{Failure, Success} -import de.upb.cs.swt.delphi.crawler.tools.BlockingHttpClient -import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} -import play.api.libs.json.Json - -import scala.util.Try - -object IRReachablePreflightCheck extends PreflightCheck{ - override def check(configuration: Configuration)(implicit system: ActorSystem): Try[Configuration] = { - val fieldList = List( - ("Type","Crawler"), - ("IP",InetAddress.getLocalHost().getHostName()), - ("port",configuration.controlServerPort.toString()) - ) - val json = Json.toJson(fieldList) - - BlockingHttpClient.executePost("/register", json.toString(), response => { - Success(configuration) - })(configuration) - - Failure(new IllegalArgumentException(s"Cannot reach instance registry at ${configuration.instanceRegistryUri}")) - } -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala new file mode 100644 index 0000000..de9044b --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -0,0 +1,25 @@ +package de.upb.cs.swt.delphi.crawler.instancemanagement + +import de.upb.cs.swt.delphi.crawler.Configuration + +import scala.util.{Failure, Success, Try} + +object InstanceRegistry +{ + def register(Name: String, configuration: Configuration) : Try[Configuration] = { + //TODO: Call generated API here + Success(configuration) + } + + def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { + if(!configuration.usingInstanceRegistry) Failure + //TODO: Call generated API here + Success("") + } + + def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { + if(!configuration.usingInstanceRegistry) Failure + //TODO: Call generated API here + Success() + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala index d85d018..636ed99 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala @@ -3,6 +3,7 @@ package de.upb.cs.swt.delphi.crawler.storage import akka.actor.ActorSystem import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient} +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} import scala.concurrent.duration._ @@ -16,8 +17,14 @@ object ElasticReachablePreflightCheck extends PreflightCheck { val f = (client.execute { nodeInfo() - } map { i => Success(configuration) - } recover { case e => Failure(e) + } map { i => { + if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(true, configuration) + Success(configuration) + } + } recover { case e => { + if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(false, configuration) + Failure(e) + } }).andThen { case _ => client.close() } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala deleted file mode 100644 index 950fb21..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/tools/BlockingHttpClient.scala +++ /dev/null @@ -1,65 +0,0 @@ -package de.upb.cs.swt.delphi.crawler.tools - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model._ -import akka.stream.{ActorMaterializer, ActorMaterializerSettings} -import akka.util.ByteString -import de.upb.cs.swt.delphi.crawler.Configuration - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} - -/*** - * A blocking http client implemented using Akka HTTP - */ -object BlockingHttpClient { - - def doRequest(uri : Uri, method : HttpMethod, jsonData : String) : Try[String] = { - implicit val system = ActorSystem() - implicit val executionContext = system.dispatcher - implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system)) - val httpActor = Http(system) - - try { - println("Creating request...") - val request = HttpRequest(method = method, uri = uri, entity = ByteString(jsonData)) - println("Executing request...") - val req: Future[HttpResponse] = httpActor.singleRequest(request) - println("Awatiting response...") - Await.result(req, Duration.Inf) - - println("Parsing response...") - - val f = req.value.get.get.entity.dataBytes.runFold(ByteString(""))(_ ++ _) - Await.result(f, Duration.Inf) - - Success(f.value.get.get.utf8String) - } catch { - case e : Exception => { - httpActor.shutdownAllConnectionPools() - system.terminate() - Failure(e) - } - } - - } - - def executeRequest(target: String, method : HttpMethod, jsonData: String, onSuccess: String => Unit)(config: Configuration) : Unit = { - - val uri = Uri(config.instanceRegistryUri) - println(s"Contacting server ${config.instanceRegistryUri}...") - val resp = BlockingHttpClient.doRequest(uri.withPath(uri.path + target), method, jsonData) - - resp match { - case Success(res) => onSuccess(res) - case Failure(_) => println(s"Could not reach server ${config.instanceRegistryUri}.") - } - } - - def executeGet(target: String, jsonData: String, onSuccess: String => Unit)(config: Configuration): Unit = executeRequest(target, HttpMethods.GET, jsonData, onSuccess)(config) - def executePost(target: String, jsonData: String, onSuccess: String => Unit)(config: Configuration): Unit = executeRequest(target, HttpMethods.POST, jsonData, onSuccess)(config) - - -} From ab2bce3a39d287bc769fede9953adaa83a2412b3 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 21 Aug 2018 17:01:27 +0200 Subject: [PATCH 03/20] Included client API code from swagger to connect to IR Still yields runtime exception, dependency 'spray-client' is no compatible with scala 2.12 . For scala 2.12 it it contents are part of akka-http-core, this needs to be upgraded. --- build.sbt | 5 +- src/main/resources/reference.conf | 24 ++ .../cs/swt/delphi/crawler/Configuration.scala | 2 +- .../instancemanagement/InstanceRegistry.scala | 15 +- .../swagger/client/api/EnumsSerializers.scala | 45 +++ .../io/swagger/client/api/InstanceApi.scala | 113 ++++++ .../io/swagger/client/core/ApiInvoker.scala | 329 ++++++++++++++++++ .../io/swagger/client/core/ApiRequest.scala | 59 ++++ .../io/swagger/client/core/ApiSettings.scala | 36 ++ .../io/swagger/client/core/requests.scala | 187 ++++++++++ .../io/swagger/client/model/Instance.scala | 33 ++ .../io/swagger/client/model/InstanceID.scala | 15 + 12 files changed, 858 insertions(+), 5 deletions(-) create mode 100644 src/main/resources/reference.conf create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala diff --git a/build.sbt b/build.sbt index 0008eff..941cd18 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,10 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, - "com.typesafe.akka" %% "akka-http" % "10.1.3" + "com.typesafe.akka" %% "akka-http" % "10.1.3", + "org.json4s" %% "json4s-jackson" % "3.6.0", + "io.swagger" % "swagger-core" % "1.5.21", + "io.spray" % "spray-client" % "1.3.1" ) libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..f993f76 --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +io.swagger.client { + + apiRequest { + + compression { + enabled: false + size-threshold: 0 + } + + trust-certificates: true + + connection-timeout: 5000ms + + default-headers { + "userAgent": "swagger-client_1.0.0" + } + + // let you define custom http status code, as in : + // { code: 601, reason: "some custom http status code", success: false } + custom-codes : [] + } +} + +spray.can.host-connector.max-redirects = 10 \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 500dd68..6f5ebea 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -20,7 +20,7 @@ class Configuration { val controlServerPort : Int = 8882 val throttle : Throttle = Throttle(10, 10 millis, 10, ThrottleMode.shaping) val limit : Int = 50 - val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9300") + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9500") lazy val usingInstanceRegistry = InstanceRegistry.register("MyCrawlerInstance",this) match { case Success(_) => true diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index de9044b..5332048 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -1,6 +1,10 @@ package de.upb.cs.swt.delphi.crawler.instancemanagement -import de.upb.cs.swt.delphi.crawler.Configuration +import de.upb.cs.swt.delphi.crawler.{Configuration, Crawler} +import de.upb.cs.swt.delphi.crawler.io.swagger.client.api.InstanceApi +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, InstanceEnums} import scala.util.{Failure, Success, Try} @@ -8,13 +12,18 @@ object InstanceRegistry { def register(Name: String, configuration: Configuration) : Try[Configuration] = { //TODO: Call generated API here - Success(configuration) + val instance = Instance(None, None, Option(configuration.controlServerPort), Option(Name), Option(ComponentType.Crawler)) + val request = InstanceApi.addInstance(instance, configuration.instanceRegistryUri) + implicit val system = Crawler.system + ApiInvoker().execute(request) + //Success(configuration) + Failure(new Exception()) } def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { if(!configuration.usingInstanceRegistry) Failure //TODO: Call generated API here - Success("") + Failure(new Exception()) } def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala new file mode 100644 index 0000000..e8bcd5c --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala @@ -0,0 +1,45 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ +package de.upb.cs.swt.delphi.crawler.io.swagger.client.api + +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model._ +import scala.reflect.ClassTag +import org.json4s._ + +object EnumsSerializers { + + def all: Seq[Serializer[_]] = Seq[Serializer[_]]() :+ + new EnumNameSerializer(InstanceEnums.ComponentType) + + private class EnumNameSerializer[E <: Enumeration: ClassTag](enum: E) + extends Serializer[E#Value] { + + import JsonDSL._ + + val EnumerationClass: Class[E#Value] = classOf[E#Value] + + def deserialize(implicit format: Formats): + PartialFunction[(TypeInfo, JValue), E#Value] = { + case (t @ TypeInfo(EnumerationClass, _), json) if isValid(json) => + json match { + case JString(value) => + enum.withName(value) + case value => + throw new MappingException(s"Can't convert $value to $EnumerationClass") + } + } + + private[this] def isValid(json: JValue) = json match { + case JString(value) if enum.values.exists(_.toString == value) => true + case _ => false + } + + def serialize(implicit format: Formats): PartialFunction[Any, JValue] = { + case i: E#Value => i.toString + } + } + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala new file mode 100644 index 0000000..5563086 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala @@ -0,0 +1,113 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ +package de.upb.cs.swt.delphi.crawler.io.swagger.client.api + +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceID +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core._ +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.CollectionFormats._ +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiKeyLocations._ + +object InstanceApi { + + /** + * Register New Instances + * + * Expected answers: + * code 200 : (Registeration Successful) + * code 405 : (Invalid input) + * + * @param registering Data to Register Instance + */ + def addInstance(registering: Instance, basePath: String): ApiRequest[Unit] = + ApiRequest[Unit](ApiMethods.POST, basePath, "/register", "application/json") + .withBody(registering) + .withSuccessResponse[Unit](200) + .withErrorResponse[Unit](405) + /** + * Delete an Instance + * + * Expected answers: + * code 200 : (Sucessfully Deregistered) + * code 400 : (Invalid Status Value) + * code 404 : (Instance not found) + * code 405 : (Validation exception) + * + * @param instance Details of Instance to be deleted + */ + def deleteInstance(instance: InstanceID): ApiRequest[Unit] = + ApiRequest[Unit](ApiMethods.POST, "https://localhost:8085", "/deregister", "application/json") + .withBody(instance) + .withSuccessResponse[Unit](200) + .withErrorResponse[Unit](400) + .withErrorResponse[Unit](404) + .withErrorResponse[Unit](405) + /** + * Fetch Specific Instance + * + * Expected answers: + * code 200 : Seq[Instance] (successful operation) + * code 400 : (Invalid value) + * + * @param componentType + */ + def fetchInstance(componentType: String): ApiRequest[Seq[Instance]] = + ApiRequest[Seq[Instance]](ApiMethods.GET, "https://localhost:8085", "/instances", "application/json") + .withQueryParam("ComponentType", componentType) + .withSuccessResponse[Seq[Instance]](200) + .withErrorResponse[Unit](400) + /** + * How many instances per type are running + * + * Expected answers: + * code 200 : Int (successful operation) + * code 400 : (Invalid ID supplied) + * code 404 : (Instances not found) + * + * @param componentType + */ + def getInstanceNumber(componentType: String): ApiRequest[Int] = + ApiRequest[Int](ApiMethods.GET, "https://localhost:8085", "/numberOfInstances", "application/json") + .withQueryParam("ComponentType", componentType) + .withSuccessResponse[Int](200) + .withErrorResponse[Unit](400) + .withErrorResponse[Unit](404) + /** + * Match the instance + * + * Expected answers: + * code 200 : (successful operation) + * code 400 : (Invalid ID supplied) + * code 404 : (No match found) + * + * @param matchingSuccessful Boolean to indicate if matching is successful + * @param component Registering Instance + */ + def getMatchingInstance(matchingSuccessful: Boolean, component: Instance): ApiRequest[Unit] = + ApiRequest[Unit](ApiMethods.POST, "https://localhost:8085", "/matchingResult", "application/json") + .withBody(component) + .withQueryParam("MatchingSuccessful", matchingSuccessful) + .withSuccessResponse[Unit](200) + .withErrorResponse[Unit](400) + .withErrorResponse[Unit](404) + /** + * + * + * Expected answers: + * code 200 : () + * code 400 : (Invalid status value) + * + * @param componentType Component to be fetched + */ + def matchInstance(componentType: String): ApiRequest[Unit] = + ApiRequest[Unit](ApiMethods.GET, "https://localhost:8085", "/matchingInstance", "application/json") + .withQueryParam("ComponentType", componentType) + .withSuccessResponse[Unit](200) + .withErrorResponse[Unit](400) + + +} + diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala new file mode 100644 index 0000000..3be096c --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala @@ -0,0 +1,329 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ + +package de.upb.cs.swt.delphi.crawler.io.swagger.client.core + +import java.io.File +import java.security.cert.X509Certificate +import javax.net.ssl._ + +import akka.actor.ActorSystem +import akka.io.IO +import akka.pattern.ask +import akka.util.Timeout +import org.joda.time.DateTime +import org.joda.time.format.ISODateTimeFormat +import org.json4s.JsonAST.JString +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import org.json4s.jackson.Serialization +import spray.can.Http +import spray.can.Http.HostConnectorSetup +import spray.client.pipelining +import spray.client.pipelining._ +import spray.http.HttpEncodings._ +import spray.http.HttpHeaders.{RawHeader, `Accept-Encoding`} +import spray.http.Uri.Query +import spray.http._ +import spray.http.parser.HttpParser +import spray.httpx.encoding.{Deflate, Encoder, Gzip} +import spray.httpx.unmarshalling._ +import spray.io.ClientSSLEngineProvider + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +object ApiInvoker { + + def apply()(implicit system: ActorSystem): ApiInvoker = + apply(DefaultFormats + DateTimeSerializer) + def apply(serializers: Iterable[Serializer[_]])(implicit system: ActorSystem): ApiInvoker = + apply(DefaultFormats + DateTimeSerializer ++ serializers) + def apply(formats: Formats)(implicit system: ActorSystem): ApiInvoker = new ApiInvoker(formats) + + case class CustomStatusCode(value: Int, reason: String = "Application-defined status code", isSuccess: Boolean = true) + + def addCustomStatusCode(code: CustomStatusCode): Unit = addCustomStatusCode(code.value, code.reason, code.isSuccess) + + def addCustomStatusCode(code: Int, reason: String = "Application defined code", isSuccess: Boolean = true): Unit = { + StatusCodes.getForKey(code) foreach { _ => + StatusCodes.registerCustom(code, reason, reason, isSuccess, allowsEntity = true) + } + } + + /** + * Allows request execution without calling apiInvoker.execute(request) + * request.response can be used to get a future of the ApiResponse generated. + * request.result can be used to get a future of the expected ApiResponse content. If content doesn't match, a + * Future will failed with a ClassCastException + * @param request the apiRequest to be executed + */ + implicit class ApiRequestImprovements[T](request: ApiRequest[T]) { + + def response(invoker: ApiInvoker)(implicit ec: ExecutionContext, system: ActorSystem): Future[ApiResponse[T]] = + response(ec, system, invoker) + + def response(implicit ec: ExecutionContext, system: ActorSystem, invoker: ApiInvoker): Future[ApiResponse[T]] = + invoker.execute(request) + + def result[U <: T](implicit c: ClassTag[U], ec: ExecutionContext, system: ActorSystem, invoker: ApiInvoker): Future[U] = + invoker.execute(request).map(_.content).mapTo[U] + + } + + /** + * Allows transformation from ApiMethod to spray HttpMethods + * @param method the ApiMethod to be converted + */ + implicit class ApiMethodExtensions(val method: ApiMethod) { + def toSprayMethod: HttpMethod = HttpMethods.getForKey(method.value).getOrElse(HttpMethods.GET) + } + + case object DateTimeSerializer extends CustomSerializer[DateTime](format => ( { + case JString(s) => + ISODateTimeFormat.dateOptionalTimeParser().parseDateTime(s) + }, { + case d: DateTime => + JString(ISODateTimeFormat.dateTime().print(d)) + })) +} + +class ApiInvoker(formats: Formats)(implicit system: ActorSystem) extends UntrustedSslContext with CustomContentTypes { + + import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker._ + import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ParametersMap._ + + + implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val jsonFormats: Formats = formats + + def settings = ApiSettings(system) + + import spray.http.MessagePredicate._ + + val CompressionFilter: MessagePredicate= MessagePredicate({ _ => settings.compressionEnabled}) && + Encoder.DefaultFilter && minEntitySize(settings.compressionSizeThreshold) + + settings.customCodes.foreach(addCustomStatusCode) + + private def addAuthentication(credentialsSeq: Seq[Credentials]): pipelining.RequestTransformer = + request => + credentialsSeq.foldLeft(request) { + case (req, BasicCredentials(login, password)) => + req ~> addCredentials(BasicHttpCredentials(login, password)) + case (req, ApiKeyCredentials(keyValue, keyName, ApiKeyLocations.HEADER)) => + req ~> addHeader(RawHeader(keyName, keyValue.value)) + case (req, _) => req + } + + private def addHeaders(headers: Map[String, Any]): pipelining.RequestTransformer = { request => + + val rawHeaders = for { + (name, value) <- headers.asFormattedParams + header = RawHeader(name, String.valueOf(value)) + } yield header + + request.withHeaders(rawHeaders.toList) + } + + private def bodyPart(name: String, value: Any): BodyPart = { + value match { + case f: File => + BodyPart(f, name) + case v: String => + BodyPart(HttpEntity(String.valueOf(v))) + case NumericValue(v) => + BodyPart(HttpEntity(String.valueOf(v))) + case m: ApiModel => + BodyPart(HttpEntity(Serialization.write(m))) + } + } + + private def formDataContent(request: ApiRequest[_]) = { + val params = request.formParams.asFormattedParams + if (params.isEmpty) + None + else + Some( + normalizedContentType(request.contentType).mediaType match { + case MediaTypes.`multipart/form-data` => + MultipartFormData(params.map { case (name, value) => (name, bodyPart(name, value))}) + case MediaTypes.`application/x-www-form-urlencoded` => + FormData(params.mapValues(String.valueOf)) + case m: MediaType => // Default : application/x-www-form-urlencoded. + FormData(params.mapValues(String.valueOf)) + } + ) + } + + private def bodyContent(request: ApiRequest[_]): Option[Any] = { + request.bodyParam.map(Extraction.decompose).map(compact) + } + + private def createRequest(uri: Uri, request: ApiRequest[_]): HttpRequest = { + + val builder = new RequestBuilder(request.method.toSprayMethod) + val httpRequest = request.method.toSprayMethod match { + case HttpMethods.GET | HttpMethods.DELETE => builder.apply(uri) + case HttpMethods.POST | HttpMethods.PUT => + formDataContent(request) orElse bodyContent(request) match { + case Some(c: FormData) => + builder.apply(uri, c) + case Some(c: MultipartFormData) => + builder.apply(uri, c) + case Some(c: String) => + builder.apply(uri, HttpEntity(normalizedContentType(request.contentType), c)) + case _ => + builder.apply(uri, HttpEntity(normalizedContentType(request.contentType), " ")) + } + case _ => builder.apply(uri) + } + + httpRequest ~> + addHeaders(request.headerParams) ~> + addAuthentication(request.credentials) ~> + encode(Gzip(CompressionFilter)) + } + + def makeQuery(r: ApiRequest[_]): Query = { + r.credentials.foldLeft(r.queryParams) { + case (params, ApiKeyCredentials(key, keyName, ApiKeyLocations.QUERY)) => + params + (keyName -> key.value) + case (params, _) => params + }.asFormattedParams + .mapValues(String.valueOf) + .foldRight[Query](Uri.Query.Empty) { + case ((name, value), acc) => acc.+:(name, value) + } + } + + def makeUri(r: ApiRequest[_]): Uri = { + val opPath = r.operationPath.replaceAll("\\{format\\}", "json") + val opPathWithParams = r.pathParams.asFormattedParams + .mapValues(String.valueOf) + .foldLeft(opPath) { + case (path, (name, value)) => path.replaceAll(s"\\{$name\\}", value) + } + val query = makeQuery(r) + + Uri(r.basePath + opPathWithParams).withQuery(query) + } + + def execute[T](r: ApiRequest[T]): Future[ApiResponse[T]] = { + try { + implicit val timeout: Timeout = settings.connectionTimeout + + val uri = makeUri(r) + + val connector = HostConnectorSetup( + uri.authority.host.toString, + uri.effectivePort, + sslEncryption = "https".equals(uri.scheme), + defaultHeaders = settings.defaultHeaders ++ List(`Accept-Encoding`(gzip, deflate))) + + val request = createRequest(uri, r) + + for { + Http.HostConnectorInfo(hostConnector, _) <- IO(Http) ? connector + response <- hostConnector.ask(request).mapTo[HttpResponse] + } yield { + response ~> decode(Deflate) ~> decode(Gzip) ~> unmarshallApiResponse(r) + } + } + catch { + case NonFatal(x) => Future.failed(x) + } + } + + def unmarshallApiResponse[T](request: ApiRequest[T])(response: HttpResponse): ApiResponse[T] = { + request.responseForCode(response.status.intValue) match { + case Some( (manifest: Manifest[T], state: ResponseState) ) => + entityUnmarshaller(manifest)(response.entity) match { + case Right(value) ⇒ + state match { + case ResponseState.Success => + ApiResponse(response.status.intValue, value, response.headers.map(header => (header.name, header.value)).toMap) + case ResponseState.Error => + throw ApiError(response.status.intValue, "Error response received", + Some(value), + headers = response.headers.map(header => (header.name, header.value)).toMap) + } + + case Left(MalformedContent(error, Some(cause))) ⇒ + throw ApiError(response.status.intValue, s"Unable to unmarshall content to [$manifest]", Some(response.entity.toString), cause) + + case Left(MalformedContent(error, None)) ⇒ + throw ApiError(response.status.intValue, s"Unable to unmarshall content to [$manifest]", Some(response.entity.toString)) + + case Left(ContentExpected) ⇒ + throw ApiError(response.status.intValue, s"Unable to unmarshall empty response to [$manifest]", Some(response.entity.toString)) + } + + case _ => throw ApiError(response.status.intValue, "Unexpected response code", Some(response.entity.toString)) + } + } + + def entityUnmarshaller[T](implicit mf: Manifest[T]): Unmarshaller[T] = + Unmarshaller[T](MediaTypes.`application/json`) { + case x: HttpEntity.NonEmpty ⇒ + parse(x.asString(defaultCharset = HttpCharsets.`UTF-8`)) + .noNulls + .camelizeKeys + .extract[T] + } + +} + +sealed trait CustomContentTypes { + + def normalizedContentType(original: String): ContentType = + MediaTypes.forExtension(original) map (ContentType(_)) getOrElse parseContentType(original) + + def parseContentType(contentType: String): ContentType = { + val contentTypeAsRawHeader = HttpHeaders.RawHeader("Content-Type", contentType) + val parsedContentTypeHeader = HttpParser.parseHeader(contentTypeAsRawHeader) + (parsedContentTypeHeader: @unchecked) match { + case Right(ct: HttpHeaders.`Content-Type`) => + ct.contentType + case Left(error: ErrorInfo) => + throw new IllegalArgumentException( + s"Error converting '$contentType' to a ContentType header: '${error.summary}'") + } + } +} + +sealed trait UntrustedSslContext { + this: ApiInvoker => + + implicit lazy val trustfulSslContext: SSLContext = { + settings.alwaysTrustCertificates match { + case false => + SSLContext.getDefault + + case true => + class IgnoreX509TrustManager extends X509TrustManager { + def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = {} + + def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = {} + + def getAcceptedIssuers = null + } + + val context = SSLContext.getInstance("TLS") + context.init(null, Array(new IgnoreX509TrustManager), null) + context + } + } + + implicit val clientSSLEngineProvider = + ClientSSLEngineProvider { + _ => + val engine = trustfulSslContext.createSSLEngine() + engine.setUseClientMode(true) + engine + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala new file mode 100644 index 0000000..76e4a16 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala @@ -0,0 +1,59 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ +package de.upb.cs.swt.delphi.crawler.io.swagger.client.core + +sealed trait ResponseState + +object ResponseState { + + case object Success extends ResponseState + + case object Error extends ResponseState + +} + +case class ApiRequest[U]( + // required fields + method: ApiMethod, + basePath: String, + operationPath: String, + contentType: String, + + // optional fields + responses: Map[Int, (Manifest[_], ResponseState)] = Map.empty, + bodyParam: Option[Any] = None, + formParams: Map[String, Any] = Map.empty, + pathParams: Map[String, Any] = Map.empty, + queryParams: Map[String, Any] = Map.empty, + headerParams: Map[String, Any] = Map.empty, + credentials: Seq[Credentials] = List.empty) { + + def withCredentials(cred: Credentials): ApiRequest[U] = copy[U](credentials = credentials :+ cred) + + def withApiKey(key: ApiKeyValue, keyName: String, location: ApiKeyLocation): ApiRequest[U] = withCredentials(ApiKeyCredentials(key, keyName, location)) + + def withSuccessResponse[T](code: Int)(implicit m: Manifest[T]): ApiRequest[U] = copy[U](responses = responses + (code -> (m, ResponseState.Success))) + + def withErrorResponse[T](code: Int)(implicit m: Manifest[T]): ApiRequest[U] = copy[U](responses = responses + (code -> (m, ResponseState.Error))) + + def withDefaultSuccessResponse[T](implicit m: Manifest[T]): ApiRequest[U] = withSuccessResponse[T](0) + + def withDefaultErrorResponse[T](implicit m: Manifest[T]): ApiRequest[U] = withErrorResponse[T](0) + + def responseForCode(statusCode: Int): Option[(Manifest[_], ResponseState)] = responses.get(statusCode) orElse responses.get(0) + + def withoutBody(): ApiRequest[U] = copy[U](bodyParam = None) + + def withBody(body: Any): ApiRequest[U] = copy[U](bodyParam = Some(body)) + + def withFormParam(name: String, value: Any): ApiRequest[U] = copy[U](formParams = formParams + (name -> value)) + + def withPathParam(name: String, value: Any): ApiRequest[U] = copy[U](pathParams = pathParams + (name -> value)) + + def withQueryParam(name: String, value: Any): ApiRequest[U] = copy[U](queryParams = queryParams + (name -> value)) + + def withHeaderParam(name: String, value: Any): ApiRequest[U] = copy[U](headerParams = headerParams + (name -> value)) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala new file mode 100644 index 0000000..270c78d --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala @@ -0,0 +1,36 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ +package de.upb.cs.swt.delphi.crawler.io.swagger.client.core + +import java.util.concurrent.TimeUnit + +import akka.actor.{ExtendedActorSystem, Extension, ExtensionKey} +import com.typesafe.config.Config +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker.CustomStatusCode +import spray.http.HttpHeaders.RawHeader + +import scala.collection.JavaConversions._ +import scala.concurrent.duration.FiniteDuration + +class ApiSettings(config: Config) extends Extension { + def this(system: ExtendedActorSystem) = this(system.settings.config) + + private def cfg = config.getConfig("io.swagger.client.apiRequest") + + val alwaysTrustCertificates: Boolean = cfg.getBoolean("trust-certificates") + val defaultHeaders: List[RawHeader] = cfg.getConfig("default-headers").entrySet.toList.map(c => RawHeader(c.getKey, c.getValue.render)) + val connectionTimeout = FiniteDuration(cfg.getDuration("connection-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + val compressionEnabled: Boolean = cfg.getBoolean("compression.enabled") + val compressionSizeThreshold: Int = cfg.getBytes("compression.size-threshold").toInt + val customCodes: List[CustomStatusCode] = cfg.getConfigList("custom-codes").toList.map { c => + CustomStatusCode( + c.getInt("code"), + c.getString("reason"), + c.getBoolean("success")) + } +} + +object ApiSettings extends ExtensionKey[ApiSettings] \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala new file mode 100644 index 0000000..98e29c4 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala @@ -0,0 +1,187 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ +package de.upb.cs.swt.delphi.crawler.io.swagger.client.core + +import java.io.File +import java.net.URLEncoder + +import scala.util.Try + +sealed trait ApiReturnWithHeaders { + def headers: Map[String, String] + + def header(name: String): Option[String] = headers.get(name) + + def getStringHeader(name: String): Option[String] = header(name) + + // workaround: return date time header in string instead of datetime object + def getDateTimeHeader(name: String): Option[String] = header(name) + + def getIntHeader(name: String): Option[Int] = castedHeader(name, java.lang.Integer.parseInt) + + def getLongHeader(name: String): Option[Long] = castedHeader(name, java.lang.Long.parseLong) + + def getFloatHeader(name: String): Option[Float] = castedHeader(name, java.lang.Float.parseFloat) + + def getDoubleHeader(name: String): Option[Double] = castedHeader(name, java.lang.Double.parseDouble) + + def getBooleanHeader(name: String): Option[Boolean] = castedHeader(name, java.lang.Boolean.parseBoolean) + + private def castedHeader[U](name: String, conversion: String => U): Option[U] = { + Try { + header(name).map(conversion) + }.get + } +} + +sealed case class ApiResponse[T](code: Int, content: T, headers: Map[String, String] = Map.empty) + extends ApiReturnWithHeaders + +sealed case class ApiError[T](code: Int, message: String, responseContent: Option[T], cause: Throwable = null, headers: Map[String, String] = Map.empty) + extends Throwable(s"($code) $message.${responseContent.map(s => s" Content : $s").getOrElse("")}", cause) + with ApiReturnWithHeaders + +sealed case class ApiMethod(value: String) + +object ApiMethods { + val CONNECT = ApiMethod("CONNECT") + val DELETE = ApiMethod("DELETE") + val GET = ApiMethod("GET") + val HEAD = ApiMethod("HEAD") + val OPTIONS = ApiMethod("OPTIONS") + val PATCH = ApiMethod("PATCH") + val POST = ApiMethod("POST") + val PUT = ApiMethod("PUT") + val TRACE = ApiMethod("TRACE") +} + +/** + * This trait needs to be added to any model defined by the api. + */ +trait ApiModel + +/** + * Single trait defining a credential that can be transformed to a paramName / paramValue tupple + */ +sealed trait Credentials { + def asQueryParam: Option[(String, String)] = None +} + +sealed case class BasicCredentials(user: String, password: String) extends Credentials + +sealed case class ApiKeyCredentials(key: ApiKeyValue, keyName: String, location: ApiKeyLocation) extends Credentials { + override def asQueryParam: Option[(String, String)] = location match { + case ApiKeyLocations.QUERY => Some((keyName, key.value)) + case _ => None + } +} + +sealed case class ApiKeyValue(value: String) + +sealed trait ApiKeyLocation + +object ApiKeyLocations { + + case object QUERY extends ApiKeyLocation + + case object HEADER extends ApiKeyLocation + +} + + +/** + * Case class used to unapply numeric values only in pattern matching + * + * @param value the string representation of the numeric value + */ +sealed case class NumericValue(value: String) { + override def toString: String = value +} + +object NumericValue { + def unapply(n: Any): Option[NumericValue] = n match { + case (_: Int | _: Long | _: Float | _: Double | _: Boolean | _: Byte) => Some(NumericValue(String.valueOf(n))) + case _ => None + } +} + +/** + * Used for params being arrays + */ +sealed case class ArrayValues(values: Seq[Any], format: CollectionFormat = CollectionFormats.CSV) + +object ArrayValues { + def apply(values: Option[Seq[Any]], format: CollectionFormat): ArrayValues = + ArrayValues(values.getOrElse(Seq.empty), format) + + def apply(values: Option[Seq[Any]]): ArrayValues = ArrayValues(values, CollectionFormats.CSV) +} + + +/** + * Defines how arrays should be rendered in query strings. + */ +sealed trait CollectionFormat + +trait MergedArrayFormat extends CollectionFormat { + def separator: String +} + +object CollectionFormats { + + case object CSV extends MergedArrayFormat { + override val separator = "," + } + + case object TSV extends MergedArrayFormat { + override val separator = "\t" + } + + case object SSV extends MergedArrayFormat { + override val separator = " " + } + + case object PIPES extends MergedArrayFormat { + override val separator = "|" + } + + case object MULTI extends CollectionFormat + +} + +object ParametersMap { + + /** + * Pimp parameters maps (Map[String, Any]) in order to transform them in a sequence of String -> Any tupples, + * with valid url-encoding, arrays handling, files preservation, ... + */ + implicit class ParametersMapImprovements(val m: Map[String, Any]) { + + def asFormattedParamsList: List[(String, Any)] = m.toList.flatMap(formattedParams) + + def asFormattedParams: Map[String, Any] = m.flatMap(formattedParams) + + private def urlEncode(v: Any) = URLEncoder.encode(String.valueOf(v), "utf-8").replaceAll("\\+", "%20") + + private def formattedParams(tuple: (String, Any)): Seq[(String, Any)] = formattedParams(tuple._1, tuple._2) + + private def formattedParams(name: String, value: Any): Seq[(String, Any)] = value match { + case arr: ArrayValues => + arr.format match { + case CollectionFormats.MULTI => arr.values.flatMap(formattedParams(name, _)) + case format: MergedArrayFormat => Seq((name, arr.values.mkString(format.separator))) + } + case None => Seq.empty + case Some(opt) => formattedParams(name, opt) + case s: Seq[Any] => formattedParams(name, ArrayValues(s)) + case v: String => Seq((name, urlEncode(v))) + case NumericValue(v) => Seq((name, urlEncode(v))) + case f: File => Seq((name, f)) + case m: ApiModel => Seq((name, m)) + } + } + +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala new file mode 100644 index 0000000..8888d68 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -0,0 +1,33 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ + +package de.upb.cs.swt.delphi.crawler.io.swagger.client.model + +import org.joda.time.DateTime +import java.util.UUID +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiModel + +case class Instance ( + iD: Option[Long], + iP: Option[String], + portnumber: Option[Long], + name: Option[String], + /* Component Type */ + componentType: Option[InstanceEnums.ComponentType] +) extends ApiModel + +object InstanceEnums { + + type ComponentType = ComponentType.Value + object ComponentType extends Enumeration { + val Crawler = Value("Crawler") + val WebApi = Value("WebApi") + val WebApp = Value("WebApp") + val DelphiManagement = Value("DelphiManagement") + } + +} + diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala new file mode 100644 index 0000000..020f159 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala @@ -0,0 +1,15 @@ +/** + * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. + * https://github.com/swagger-api/swagger-codegen + * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new + */ + +package de.upb.cs.swt.delphi.crawler.io.swagger.client.model + +import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiModel + +case class InstanceID ( + iD: Option[Long] +) extends ApiModel + + From bb3d60535bb4771e305c080035f8ec3e5cdb098b Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 28 Aug 2018 13:41:44 +0200 Subject: [PATCH 04/20] Used akka http to call IR instead of outdated spray http --- build.sbt | 6 ++- .../cs/swt/delphi/crawler/Configuration.scala | 8 +-- .../instancemanagement/InstanceRegistry.scala | 49 ++++++++++++++++--- .../io/swagger/client/model/Instance.scala | 24 +++++++-- 4 files changed, 67 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index 941cd18..032feff 100644 --- a/build.sbt +++ b/build.sbt @@ -33,8 +33,9 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-http" % "10.1.3", - "org.json4s" %% "json4s-jackson" % "3.6.0", - "io.swagger" % "swagger-core" % "1.5.21", + "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8", + "org.json4s" %% "json4s-jackson" % "3.5.3", + "io.swagger" % "swagger-core" % "1.5.15", "io.spray" % "spray-client" % "1.3.1" ) @@ -56,6 +57,7 @@ libraryDependencies ++= Seq( ) resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" +resolvers ++= Seq(Resolver.mavenLocal) val opalVersion = "1.0.0" libraryDependencies ++= Seq( diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 6f5ebea..1f77610 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -22,13 +22,7 @@ class Configuration { val limit : Int = 50 val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9500") - lazy val usingInstanceRegistry = InstanceRegistry.register("MyCrawlerInstance",this) match { - case Success(_) => true - case Failure(_) => { - println(s"Failed to connect to Instance Registry at ${instanceRegistryUri}. Using default configuration instead.") - false - } - } + lazy val usingInstanceRegistry = InstanceRegistry.register("MyCrawlerInstance",this) case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 5332048..40c5c5e 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -1,23 +1,48 @@ package de.upb.cs.swt.delphi.crawler.instancemanagement +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader import de.upb.cs.swt.delphi.crawler.{Configuration, Crawler} import de.upb.cs.swt.delphi.crawler.io.swagger.client.api.InstanceApi import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType -import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, InstanceEnums} +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, InstanceEnums, JsonSupport} +import spray.json.DefaultJsonProtocol +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success, Try} -object InstanceRegistry +object InstanceRegistry extends JsonSupport { - def register(Name: String, configuration: Configuration) : Try[Configuration] = { - //TODO: Call generated API here + def register(Name: String, configuration: Configuration) : Boolean = { + val instance = Instance(None, None, Option(configuration.controlServerPort), Option(Name), Option(ComponentType.Crawler)) - val request = InstanceApi.addInstance(instance, configuration.instanceRegistryUri) + implicit val system = Crawler.system - ApiInvoker().execute(request) - //Success(configuration) - Failure(new Exception()) + implicit val ec = system.dispatcher + implicit val materializer = Crawler.materializer + + Await.result(postInstance(instance, configuration.instanceRegistryUri) map {response => + if(response.status == StatusCodes.OK || response.status == StatusCodes.Accepted) + { + println("Successfully registered at Instance Registry.") + true + } + else { + val statuscode = response.status + println(s"Failed to register at Instance Registry, server returned $statuscode") + false + } + + } recover {case ex => + println(s"Failed to register at Instance Registry, exception: $ex") + false + }, Duration.Inf) } def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { @@ -31,4 +56,12 @@ object InstanceRegistry //TODO: Call generated API here Success() } + + def postInstance(instance : Instance, uri: String) (implicit system: ActorSystem, ec : ExecutionContext) : Future[HttpResponse] = + Marshal(instance).to[RequestEntity] flatMap { entity => + val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) + println("*****REQUEST: "+ request.toString()) + Http(system).singleRequest(request) + } + } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index 8888d68..f12e71a 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -6,11 +6,29 @@ package de.upb.cs.swt.delphi.crawler.io.swagger.client.model -import org.joda.time.DateTime -import java.util.UUID +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiModel +import spray.json._ -case class Instance ( +trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] { + def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString()) + + def read(value: JsValue) = value match { + case JsString(s) => s match { + case "Crawler" => InstanceEnums.ComponentType.Crawler + case "WebApi" => InstanceEnums.ComponentType.WebApi + case "WebApp" => InstanceEnums.ComponentType.WebApp + case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement + case x => throw new RuntimeException(s"Unexpected string value $x for component type.") + } + case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.") + } + } + implicit val instanceFormat = jsonFormat5(Instance) +} + +final case class Instance ( iD: Option[Long], iP: Option[String], portnumber: Option[Long], From c2c9801a62523780dbb1b8991f9939f6d99460c9 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 28 Aug 2018 14:14:50 +0200 Subject: [PATCH 05/20] Removed unused swagger code, used logger where println was used --- .../instancemanagement/InstanceRegistry.scala | 18 +- .../swagger/client/api/EnumsSerializers.scala | 45 --- .../io/swagger/client/api/InstanceApi.scala | 113 ------ .../io/swagger/client/core/ApiInvoker.scala | 329 ------------------ .../io/swagger/client/core/ApiRequest.scala | 59 ---- .../io/swagger/client/core/ApiSettings.scala | 36 -- .../io/swagger/client/core/requests.scala | 187 ---------- .../io/swagger/client/model/Instance.scala | 3 +- .../io/swagger/client/model/InstanceID.scala | 15 - 9 files changed, 7 insertions(+), 798 deletions(-) delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 40c5c5e..1018fb9 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -2,22 +2,17 @@ package de.upb.cs.swt.delphi.crawler.instancemanagement import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.RawHeader -import de.upb.cs.swt.delphi.crawler.{Configuration, Crawler} -import de.upb.cs.swt.delphi.crawler.io.swagger.client.api.InstanceApi -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker +import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler} import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType -import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, InstanceEnums, JsonSupport} -import spray.json.DefaultJsonProtocol +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, JsonSupport} import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success, Try} -object InstanceRegistry extends JsonSupport +object InstanceRegistry extends JsonSupport with AppLogging { def register(Name: String, configuration: Configuration) : Boolean = { @@ -30,17 +25,17 @@ object InstanceRegistry extends JsonSupport Await.result(postInstance(instance, configuration.instanceRegistryUri) map {response => if(response.status == StatusCodes.OK || response.status == StatusCodes.Accepted) { - println("Successfully registered at Instance Registry.") + log.info("Successfully registered at Instance Registry.") true } else { val statuscode = response.status - println(s"Failed to register at Instance Registry, server returned $statuscode") + log.warning(s"Failed to register at Instance Registry, server returned $statuscode") false } } recover {case ex => - println(s"Failed to register at Instance Registry, exception: $ex") + log.warning(s"Failed to register at Instance Registry, exception: $ex") false }, Duration.Inf) } @@ -60,7 +55,6 @@ object InstanceRegistry extends JsonSupport def postInstance(instance : Instance, uri: String) (implicit system: ActorSystem, ec : ExecutionContext) : Future[HttpResponse] = Marshal(instance).to[RequestEntity] flatMap { entity => val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) - println("*****REQUEST: "+ request.toString()) Http(system).singleRequest(request) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala deleted file mode 100644 index e8bcd5c..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/EnumsSerializers.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ -package de.upb.cs.swt.delphi.crawler.io.swagger.client.api - -import de.upb.cs.swt.delphi.crawler.io.swagger.client.model._ -import scala.reflect.ClassTag -import org.json4s._ - -object EnumsSerializers { - - def all: Seq[Serializer[_]] = Seq[Serializer[_]]() :+ - new EnumNameSerializer(InstanceEnums.ComponentType) - - private class EnumNameSerializer[E <: Enumeration: ClassTag](enum: E) - extends Serializer[E#Value] { - - import JsonDSL._ - - val EnumerationClass: Class[E#Value] = classOf[E#Value] - - def deserialize(implicit format: Formats): - PartialFunction[(TypeInfo, JValue), E#Value] = { - case (t @ TypeInfo(EnumerationClass, _), json) if isValid(json) => - json match { - case JString(value) => - enum.withName(value) - case value => - throw new MappingException(s"Can't convert $value to $EnumerationClass") - } - } - - private[this] def isValid(json: JValue) = json match { - case JString(value) if enum.values.exists(_.toString == value) => true - case _ => false - } - - def serialize(implicit format: Formats): PartialFunction[Any, JValue] = { - case i: E#Value => i.toString - } - } - -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala deleted file mode 100644 index 5563086..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/api/InstanceApi.scala +++ /dev/null @@ -1,113 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ -package de.upb.cs.swt.delphi.crawler.io.swagger.client.api - -import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance -import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceID -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core._ -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.CollectionFormats._ -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiKeyLocations._ - -object InstanceApi { - - /** - * Register New Instances - * - * Expected answers: - * code 200 : (Registeration Successful) - * code 405 : (Invalid input) - * - * @param registering Data to Register Instance - */ - def addInstance(registering: Instance, basePath: String): ApiRequest[Unit] = - ApiRequest[Unit](ApiMethods.POST, basePath, "/register", "application/json") - .withBody(registering) - .withSuccessResponse[Unit](200) - .withErrorResponse[Unit](405) - /** - * Delete an Instance - * - * Expected answers: - * code 200 : (Sucessfully Deregistered) - * code 400 : (Invalid Status Value) - * code 404 : (Instance not found) - * code 405 : (Validation exception) - * - * @param instance Details of Instance to be deleted - */ - def deleteInstance(instance: InstanceID): ApiRequest[Unit] = - ApiRequest[Unit](ApiMethods.POST, "https://localhost:8085", "/deregister", "application/json") - .withBody(instance) - .withSuccessResponse[Unit](200) - .withErrorResponse[Unit](400) - .withErrorResponse[Unit](404) - .withErrorResponse[Unit](405) - /** - * Fetch Specific Instance - * - * Expected answers: - * code 200 : Seq[Instance] (successful operation) - * code 400 : (Invalid value) - * - * @param componentType - */ - def fetchInstance(componentType: String): ApiRequest[Seq[Instance]] = - ApiRequest[Seq[Instance]](ApiMethods.GET, "https://localhost:8085", "/instances", "application/json") - .withQueryParam("ComponentType", componentType) - .withSuccessResponse[Seq[Instance]](200) - .withErrorResponse[Unit](400) - /** - * How many instances per type are running - * - * Expected answers: - * code 200 : Int (successful operation) - * code 400 : (Invalid ID supplied) - * code 404 : (Instances not found) - * - * @param componentType - */ - def getInstanceNumber(componentType: String): ApiRequest[Int] = - ApiRequest[Int](ApiMethods.GET, "https://localhost:8085", "/numberOfInstances", "application/json") - .withQueryParam("ComponentType", componentType) - .withSuccessResponse[Int](200) - .withErrorResponse[Unit](400) - .withErrorResponse[Unit](404) - /** - * Match the instance - * - * Expected answers: - * code 200 : (successful operation) - * code 400 : (Invalid ID supplied) - * code 404 : (No match found) - * - * @param matchingSuccessful Boolean to indicate if matching is successful - * @param component Registering Instance - */ - def getMatchingInstance(matchingSuccessful: Boolean, component: Instance): ApiRequest[Unit] = - ApiRequest[Unit](ApiMethods.POST, "https://localhost:8085", "/matchingResult", "application/json") - .withBody(component) - .withQueryParam("MatchingSuccessful", matchingSuccessful) - .withSuccessResponse[Unit](200) - .withErrorResponse[Unit](400) - .withErrorResponse[Unit](404) - /** - * - * - * Expected answers: - * code 200 : () - * code 400 : (Invalid status value) - * - * @param componentType Component to be fetched - */ - def matchInstance(componentType: String): ApiRequest[Unit] = - ApiRequest[Unit](ApiMethods.GET, "https://localhost:8085", "/matchingInstance", "application/json") - .withQueryParam("ComponentType", componentType) - .withSuccessResponse[Unit](200) - .withErrorResponse[Unit](400) - - -} - diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala deleted file mode 100644 index 3be096c..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiInvoker.scala +++ /dev/null @@ -1,329 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ - -package de.upb.cs.swt.delphi.crawler.io.swagger.client.core - -import java.io.File -import java.security.cert.X509Certificate -import javax.net.ssl._ - -import akka.actor.ActorSystem -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat -import org.json4s.JsonAST.JString -import org.json4s._ -import org.json4s.jackson.JsonMethods._ -import org.json4s.jackson.Serialization -import spray.can.Http -import spray.can.Http.HostConnectorSetup -import spray.client.pipelining -import spray.client.pipelining._ -import spray.http.HttpEncodings._ -import spray.http.HttpHeaders.{RawHeader, `Accept-Encoding`} -import spray.http.Uri.Query -import spray.http._ -import spray.http.parser.HttpParser -import spray.httpx.encoding.{Deflate, Encoder, Gzip} -import spray.httpx.unmarshalling._ -import spray.io.ClientSSLEngineProvider - -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} -import scala.reflect.ClassTag -import scala.util.control.NonFatal - -object ApiInvoker { - - def apply()(implicit system: ActorSystem): ApiInvoker = - apply(DefaultFormats + DateTimeSerializer) - def apply(serializers: Iterable[Serializer[_]])(implicit system: ActorSystem): ApiInvoker = - apply(DefaultFormats + DateTimeSerializer ++ serializers) - def apply(formats: Formats)(implicit system: ActorSystem): ApiInvoker = new ApiInvoker(formats) - - case class CustomStatusCode(value: Int, reason: String = "Application-defined status code", isSuccess: Boolean = true) - - def addCustomStatusCode(code: CustomStatusCode): Unit = addCustomStatusCode(code.value, code.reason, code.isSuccess) - - def addCustomStatusCode(code: Int, reason: String = "Application defined code", isSuccess: Boolean = true): Unit = { - StatusCodes.getForKey(code) foreach { _ => - StatusCodes.registerCustom(code, reason, reason, isSuccess, allowsEntity = true) - } - } - - /** - * Allows request execution without calling apiInvoker.execute(request) - * request.response can be used to get a future of the ApiResponse generated. - * request.result can be used to get a future of the expected ApiResponse content. If content doesn't match, a - * Future will failed with a ClassCastException - * @param request the apiRequest to be executed - */ - implicit class ApiRequestImprovements[T](request: ApiRequest[T]) { - - def response(invoker: ApiInvoker)(implicit ec: ExecutionContext, system: ActorSystem): Future[ApiResponse[T]] = - response(ec, system, invoker) - - def response(implicit ec: ExecutionContext, system: ActorSystem, invoker: ApiInvoker): Future[ApiResponse[T]] = - invoker.execute(request) - - def result[U <: T](implicit c: ClassTag[U], ec: ExecutionContext, system: ActorSystem, invoker: ApiInvoker): Future[U] = - invoker.execute(request).map(_.content).mapTo[U] - - } - - /** - * Allows transformation from ApiMethod to spray HttpMethods - * @param method the ApiMethod to be converted - */ - implicit class ApiMethodExtensions(val method: ApiMethod) { - def toSprayMethod: HttpMethod = HttpMethods.getForKey(method.value).getOrElse(HttpMethods.GET) - } - - case object DateTimeSerializer extends CustomSerializer[DateTime](format => ( { - case JString(s) => - ISODateTimeFormat.dateOptionalTimeParser().parseDateTime(s) - }, { - case d: DateTime => - JString(ISODateTimeFormat.dateTime().print(d)) - })) -} - -class ApiInvoker(formats: Formats)(implicit system: ActorSystem) extends UntrustedSslContext with CustomContentTypes { - - import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker._ - import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ParametersMap._ - - - implicit val ec: ExecutionContextExecutor = system.dispatcher - implicit val jsonFormats: Formats = formats - - def settings = ApiSettings(system) - - import spray.http.MessagePredicate._ - - val CompressionFilter: MessagePredicate= MessagePredicate({ _ => settings.compressionEnabled}) && - Encoder.DefaultFilter && minEntitySize(settings.compressionSizeThreshold) - - settings.customCodes.foreach(addCustomStatusCode) - - private def addAuthentication(credentialsSeq: Seq[Credentials]): pipelining.RequestTransformer = - request => - credentialsSeq.foldLeft(request) { - case (req, BasicCredentials(login, password)) => - req ~> addCredentials(BasicHttpCredentials(login, password)) - case (req, ApiKeyCredentials(keyValue, keyName, ApiKeyLocations.HEADER)) => - req ~> addHeader(RawHeader(keyName, keyValue.value)) - case (req, _) => req - } - - private def addHeaders(headers: Map[String, Any]): pipelining.RequestTransformer = { request => - - val rawHeaders = for { - (name, value) <- headers.asFormattedParams - header = RawHeader(name, String.valueOf(value)) - } yield header - - request.withHeaders(rawHeaders.toList) - } - - private def bodyPart(name: String, value: Any): BodyPart = { - value match { - case f: File => - BodyPart(f, name) - case v: String => - BodyPart(HttpEntity(String.valueOf(v))) - case NumericValue(v) => - BodyPart(HttpEntity(String.valueOf(v))) - case m: ApiModel => - BodyPart(HttpEntity(Serialization.write(m))) - } - } - - private def formDataContent(request: ApiRequest[_]) = { - val params = request.formParams.asFormattedParams - if (params.isEmpty) - None - else - Some( - normalizedContentType(request.contentType).mediaType match { - case MediaTypes.`multipart/form-data` => - MultipartFormData(params.map { case (name, value) => (name, bodyPart(name, value))}) - case MediaTypes.`application/x-www-form-urlencoded` => - FormData(params.mapValues(String.valueOf)) - case m: MediaType => // Default : application/x-www-form-urlencoded. - FormData(params.mapValues(String.valueOf)) - } - ) - } - - private def bodyContent(request: ApiRequest[_]): Option[Any] = { - request.bodyParam.map(Extraction.decompose).map(compact) - } - - private def createRequest(uri: Uri, request: ApiRequest[_]): HttpRequest = { - - val builder = new RequestBuilder(request.method.toSprayMethod) - val httpRequest = request.method.toSprayMethod match { - case HttpMethods.GET | HttpMethods.DELETE => builder.apply(uri) - case HttpMethods.POST | HttpMethods.PUT => - formDataContent(request) orElse bodyContent(request) match { - case Some(c: FormData) => - builder.apply(uri, c) - case Some(c: MultipartFormData) => - builder.apply(uri, c) - case Some(c: String) => - builder.apply(uri, HttpEntity(normalizedContentType(request.contentType), c)) - case _ => - builder.apply(uri, HttpEntity(normalizedContentType(request.contentType), " ")) - } - case _ => builder.apply(uri) - } - - httpRequest ~> - addHeaders(request.headerParams) ~> - addAuthentication(request.credentials) ~> - encode(Gzip(CompressionFilter)) - } - - def makeQuery(r: ApiRequest[_]): Query = { - r.credentials.foldLeft(r.queryParams) { - case (params, ApiKeyCredentials(key, keyName, ApiKeyLocations.QUERY)) => - params + (keyName -> key.value) - case (params, _) => params - }.asFormattedParams - .mapValues(String.valueOf) - .foldRight[Query](Uri.Query.Empty) { - case ((name, value), acc) => acc.+:(name, value) - } - } - - def makeUri(r: ApiRequest[_]): Uri = { - val opPath = r.operationPath.replaceAll("\\{format\\}", "json") - val opPathWithParams = r.pathParams.asFormattedParams - .mapValues(String.valueOf) - .foldLeft(opPath) { - case (path, (name, value)) => path.replaceAll(s"\\{$name\\}", value) - } - val query = makeQuery(r) - - Uri(r.basePath + opPathWithParams).withQuery(query) - } - - def execute[T](r: ApiRequest[T]): Future[ApiResponse[T]] = { - try { - implicit val timeout: Timeout = settings.connectionTimeout - - val uri = makeUri(r) - - val connector = HostConnectorSetup( - uri.authority.host.toString, - uri.effectivePort, - sslEncryption = "https".equals(uri.scheme), - defaultHeaders = settings.defaultHeaders ++ List(`Accept-Encoding`(gzip, deflate))) - - val request = createRequest(uri, r) - - for { - Http.HostConnectorInfo(hostConnector, _) <- IO(Http) ? connector - response <- hostConnector.ask(request).mapTo[HttpResponse] - } yield { - response ~> decode(Deflate) ~> decode(Gzip) ~> unmarshallApiResponse(r) - } - } - catch { - case NonFatal(x) => Future.failed(x) - } - } - - def unmarshallApiResponse[T](request: ApiRequest[T])(response: HttpResponse): ApiResponse[T] = { - request.responseForCode(response.status.intValue) match { - case Some( (manifest: Manifest[T], state: ResponseState) ) => - entityUnmarshaller(manifest)(response.entity) match { - case Right(value) ⇒ - state match { - case ResponseState.Success => - ApiResponse(response.status.intValue, value, response.headers.map(header => (header.name, header.value)).toMap) - case ResponseState.Error => - throw ApiError(response.status.intValue, "Error response received", - Some(value), - headers = response.headers.map(header => (header.name, header.value)).toMap) - } - - case Left(MalformedContent(error, Some(cause))) ⇒ - throw ApiError(response.status.intValue, s"Unable to unmarshall content to [$manifest]", Some(response.entity.toString), cause) - - case Left(MalformedContent(error, None)) ⇒ - throw ApiError(response.status.intValue, s"Unable to unmarshall content to [$manifest]", Some(response.entity.toString)) - - case Left(ContentExpected) ⇒ - throw ApiError(response.status.intValue, s"Unable to unmarshall empty response to [$manifest]", Some(response.entity.toString)) - } - - case _ => throw ApiError(response.status.intValue, "Unexpected response code", Some(response.entity.toString)) - } - } - - def entityUnmarshaller[T](implicit mf: Manifest[T]): Unmarshaller[T] = - Unmarshaller[T](MediaTypes.`application/json`) { - case x: HttpEntity.NonEmpty ⇒ - parse(x.asString(defaultCharset = HttpCharsets.`UTF-8`)) - .noNulls - .camelizeKeys - .extract[T] - } - -} - -sealed trait CustomContentTypes { - - def normalizedContentType(original: String): ContentType = - MediaTypes.forExtension(original) map (ContentType(_)) getOrElse parseContentType(original) - - def parseContentType(contentType: String): ContentType = { - val contentTypeAsRawHeader = HttpHeaders.RawHeader("Content-Type", contentType) - val parsedContentTypeHeader = HttpParser.parseHeader(contentTypeAsRawHeader) - (parsedContentTypeHeader: @unchecked) match { - case Right(ct: HttpHeaders.`Content-Type`) => - ct.contentType - case Left(error: ErrorInfo) => - throw new IllegalArgumentException( - s"Error converting '$contentType' to a ContentType header: '${error.summary}'") - } - } -} - -sealed trait UntrustedSslContext { - this: ApiInvoker => - - implicit lazy val trustfulSslContext: SSLContext = { - settings.alwaysTrustCertificates match { - case false => - SSLContext.getDefault - - case true => - class IgnoreX509TrustManager extends X509TrustManager { - def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = {} - - def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = {} - - def getAcceptedIssuers = null - } - - val context = SSLContext.getInstance("TLS") - context.init(null, Array(new IgnoreX509TrustManager), null) - context - } - } - - implicit val clientSSLEngineProvider = - ClientSSLEngineProvider { - _ => - val engine = trustfulSslContext.createSSLEngine() - engine.setUseClientMode(true) - engine - } -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala deleted file mode 100644 index 76e4a16..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiRequest.scala +++ /dev/null @@ -1,59 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ -package de.upb.cs.swt.delphi.crawler.io.swagger.client.core - -sealed trait ResponseState - -object ResponseState { - - case object Success extends ResponseState - - case object Error extends ResponseState - -} - -case class ApiRequest[U]( - // required fields - method: ApiMethod, - basePath: String, - operationPath: String, - contentType: String, - - // optional fields - responses: Map[Int, (Manifest[_], ResponseState)] = Map.empty, - bodyParam: Option[Any] = None, - formParams: Map[String, Any] = Map.empty, - pathParams: Map[String, Any] = Map.empty, - queryParams: Map[String, Any] = Map.empty, - headerParams: Map[String, Any] = Map.empty, - credentials: Seq[Credentials] = List.empty) { - - def withCredentials(cred: Credentials): ApiRequest[U] = copy[U](credentials = credentials :+ cred) - - def withApiKey(key: ApiKeyValue, keyName: String, location: ApiKeyLocation): ApiRequest[U] = withCredentials(ApiKeyCredentials(key, keyName, location)) - - def withSuccessResponse[T](code: Int)(implicit m: Manifest[T]): ApiRequest[U] = copy[U](responses = responses + (code -> (m, ResponseState.Success))) - - def withErrorResponse[T](code: Int)(implicit m: Manifest[T]): ApiRequest[U] = copy[U](responses = responses + (code -> (m, ResponseState.Error))) - - def withDefaultSuccessResponse[T](implicit m: Manifest[T]): ApiRequest[U] = withSuccessResponse[T](0) - - def withDefaultErrorResponse[T](implicit m: Manifest[T]): ApiRequest[U] = withErrorResponse[T](0) - - def responseForCode(statusCode: Int): Option[(Manifest[_], ResponseState)] = responses.get(statusCode) orElse responses.get(0) - - def withoutBody(): ApiRequest[U] = copy[U](bodyParam = None) - - def withBody(body: Any): ApiRequest[U] = copy[U](bodyParam = Some(body)) - - def withFormParam(name: String, value: Any): ApiRequest[U] = copy[U](formParams = formParams + (name -> value)) - - def withPathParam(name: String, value: Any): ApiRequest[U] = copy[U](pathParams = pathParams + (name -> value)) - - def withQueryParam(name: String, value: Any): ApiRequest[U] = copy[U](queryParams = queryParams + (name -> value)) - - def withHeaderParam(name: String, value: Any): ApiRequest[U] = copy[U](headerParams = headerParams + (name -> value)) -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala deleted file mode 100644 index 270c78d..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/ApiSettings.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ -package de.upb.cs.swt.delphi.crawler.io.swagger.client.core - -import java.util.concurrent.TimeUnit - -import akka.actor.{ExtendedActorSystem, Extension, ExtensionKey} -import com.typesafe.config.Config -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiInvoker.CustomStatusCode -import spray.http.HttpHeaders.RawHeader - -import scala.collection.JavaConversions._ -import scala.concurrent.duration.FiniteDuration - -class ApiSettings(config: Config) extends Extension { - def this(system: ExtendedActorSystem) = this(system.settings.config) - - private def cfg = config.getConfig("io.swagger.client.apiRequest") - - val alwaysTrustCertificates: Boolean = cfg.getBoolean("trust-certificates") - val defaultHeaders: List[RawHeader] = cfg.getConfig("default-headers").entrySet.toList.map(c => RawHeader(c.getKey, c.getValue.render)) - val connectionTimeout = FiniteDuration(cfg.getDuration("connection-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) - val compressionEnabled: Boolean = cfg.getBoolean("compression.enabled") - val compressionSizeThreshold: Int = cfg.getBytes("compression.size-threshold").toInt - val customCodes: List[CustomStatusCode] = cfg.getConfigList("custom-codes").toList.map { c => - CustomStatusCode( - c.getInt("code"), - c.getString("reason"), - c.getBoolean("success")) - } -} - -object ApiSettings extends ExtensionKey[ApiSettings] \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala deleted file mode 100644 index 98e29c4..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/core/requests.scala +++ /dev/null @@ -1,187 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ -package de.upb.cs.swt.delphi.crawler.io.swagger.client.core - -import java.io.File -import java.net.URLEncoder - -import scala.util.Try - -sealed trait ApiReturnWithHeaders { - def headers: Map[String, String] - - def header(name: String): Option[String] = headers.get(name) - - def getStringHeader(name: String): Option[String] = header(name) - - // workaround: return date time header in string instead of datetime object - def getDateTimeHeader(name: String): Option[String] = header(name) - - def getIntHeader(name: String): Option[Int] = castedHeader(name, java.lang.Integer.parseInt) - - def getLongHeader(name: String): Option[Long] = castedHeader(name, java.lang.Long.parseLong) - - def getFloatHeader(name: String): Option[Float] = castedHeader(name, java.lang.Float.parseFloat) - - def getDoubleHeader(name: String): Option[Double] = castedHeader(name, java.lang.Double.parseDouble) - - def getBooleanHeader(name: String): Option[Boolean] = castedHeader(name, java.lang.Boolean.parseBoolean) - - private def castedHeader[U](name: String, conversion: String => U): Option[U] = { - Try { - header(name).map(conversion) - }.get - } -} - -sealed case class ApiResponse[T](code: Int, content: T, headers: Map[String, String] = Map.empty) - extends ApiReturnWithHeaders - -sealed case class ApiError[T](code: Int, message: String, responseContent: Option[T], cause: Throwable = null, headers: Map[String, String] = Map.empty) - extends Throwable(s"($code) $message.${responseContent.map(s => s" Content : $s").getOrElse("")}", cause) - with ApiReturnWithHeaders - -sealed case class ApiMethod(value: String) - -object ApiMethods { - val CONNECT = ApiMethod("CONNECT") - val DELETE = ApiMethod("DELETE") - val GET = ApiMethod("GET") - val HEAD = ApiMethod("HEAD") - val OPTIONS = ApiMethod("OPTIONS") - val PATCH = ApiMethod("PATCH") - val POST = ApiMethod("POST") - val PUT = ApiMethod("PUT") - val TRACE = ApiMethod("TRACE") -} - -/** - * This trait needs to be added to any model defined by the api. - */ -trait ApiModel - -/** - * Single trait defining a credential that can be transformed to a paramName / paramValue tupple - */ -sealed trait Credentials { - def asQueryParam: Option[(String, String)] = None -} - -sealed case class BasicCredentials(user: String, password: String) extends Credentials - -sealed case class ApiKeyCredentials(key: ApiKeyValue, keyName: String, location: ApiKeyLocation) extends Credentials { - override def asQueryParam: Option[(String, String)] = location match { - case ApiKeyLocations.QUERY => Some((keyName, key.value)) - case _ => None - } -} - -sealed case class ApiKeyValue(value: String) - -sealed trait ApiKeyLocation - -object ApiKeyLocations { - - case object QUERY extends ApiKeyLocation - - case object HEADER extends ApiKeyLocation - -} - - -/** - * Case class used to unapply numeric values only in pattern matching - * - * @param value the string representation of the numeric value - */ -sealed case class NumericValue(value: String) { - override def toString: String = value -} - -object NumericValue { - def unapply(n: Any): Option[NumericValue] = n match { - case (_: Int | _: Long | _: Float | _: Double | _: Boolean | _: Byte) => Some(NumericValue(String.valueOf(n))) - case _ => None - } -} - -/** - * Used for params being arrays - */ -sealed case class ArrayValues(values: Seq[Any], format: CollectionFormat = CollectionFormats.CSV) - -object ArrayValues { - def apply(values: Option[Seq[Any]], format: CollectionFormat): ArrayValues = - ArrayValues(values.getOrElse(Seq.empty), format) - - def apply(values: Option[Seq[Any]]): ArrayValues = ArrayValues(values, CollectionFormats.CSV) -} - - -/** - * Defines how arrays should be rendered in query strings. - */ -sealed trait CollectionFormat - -trait MergedArrayFormat extends CollectionFormat { - def separator: String -} - -object CollectionFormats { - - case object CSV extends MergedArrayFormat { - override val separator = "," - } - - case object TSV extends MergedArrayFormat { - override val separator = "\t" - } - - case object SSV extends MergedArrayFormat { - override val separator = " " - } - - case object PIPES extends MergedArrayFormat { - override val separator = "|" - } - - case object MULTI extends CollectionFormat - -} - -object ParametersMap { - - /** - * Pimp parameters maps (Map[String, Any]) in order to transform them in a sequence of String -> Any tupples, - * with valid url-encoding, arrays handling, files preservation, ... - */ - implicit class ParametersMapImprovements(val m: Map[String, Any]) { - - def asFormattedParamsList: List[(String, Any)] = m.toList.flatMap(formattedParams) - - def asFormattedParams: Map[String, Any] = m.flatMap(formattedParams) - - private def urlEncode(v: Any) = URLEncoder.encode(String.valueOf(v), "utf-8").replaceAll("\\+", "%20") - - private def formattedParams(tuple: (String, Any)): Seq[(String, Any)] = formattedParams(tuple._1, tuple._2) - - private def formattedParams(name: String, value: Any): Seq[(String, Any)] = value match { - case arr: ArrayValues => - arr.format match { - case CollectionFormats.MULTI => arr.values.flatMap(formattedParams(name, _)) - case format: MergedArrayFormat => Seq((name, arr.values.mkString(format.separator))) - } - case None => Seq.empty - case Some(opt) => formattedParams(name, opt) - case s: Seq[Any] => formattedParams(name, ArrayValues(s)) - case v: String => Seq((name, urlEncode(v))) - case NumericValue(v) => Seq((name, urlEncode(v))) - case f: File => Seq((name, f)) - case m: ApiModel => Seq((name, m)) - } - } - -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index f12e71a..17c8f3b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -7,7 +7,6 @@ package de.upb.cs.swt.delphi.crawler.io.swagger.client.model import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiModel import spray.json._ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { @@ -35,7 +34,7 @@ final case class Instance ( name: Option[String], /* Component Type */ componentType: Option[InstanceEnums.ComponentType] -) extends ApiModel +) object InstanceEnums { diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala deleted file mode 100644 index 020f159..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/InstanceID.scala +++ /dev/null @@ -1,15 +0,0 @@ -/** - * NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program. - * https://github.com/swagger-api/swagger-codegen - * For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new - */ - -package de.upb.cs.swt.delphi.crawler.io.swagger.client.model - -import de.upb.cs.swt.delphi.crawler.io.swagger.client.core.ApiModel - -case class InstanceID ( - iD: Option[Long] -) extends ApiModel - - From 3439480c5bedad361703629e2a4d392de103444e Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 28 Aug 2018 16:24:14 +0200 Subject: [PATCH 06/20] Crawler can now get elastic search ip from IR Code Cleanup, API extensions --- .../cs/swt/delphi/crawler/Configuration.scala | 5 +- .../instancemanagement/InstanceRegistry.scala | 76 +++++++++++++++---- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 1f77610..e5e708d 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -20,9 +20,10 @@ class Configuration { val controlServerPort : Int = 8882 val throttle : Throttle = Throttle(10, 10 millis, 10, ThrottleMode.shaping) val limit : Int = 50 - val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9500") - lazy val usingInstanceRegistry = InstanceRegistry.register("MyCrawlerInstance",this) + val instanceName = "MyCrawlerInstance" + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9500") + lazy val usingInstanceRegistry = InstanceRegistry.register(this) case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 1018fb9..d14af6c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -1,30 +1,32 @@ package de.upb.cs.swt.delphi.crawler.instancemanagement -import akka.actor.ActorSystem +import java.net.InetAddress + import akka.http.scaladsl.Http import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling.Unmarshal import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler} import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, JsonSupport} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} object InstanceRegistry extends JsonSupport with AppLogging { - def register(Name: String, configuration: Configuration) : Boolean = { - val instance = Instance(None, None, Option(configuration.controlServerPort), Option(Name), Option(ComponentType.Crawler)) + implicit val system = Crawler.system + implicit val ec = system.dispatcher + implicit val materializer = Crawler.materializer + + def register(configuration: Configuration) : Boolean = { - implicit val system = Crawler.system - implicit val ec = system.dispatcher - implicit val materializer = Crawler.materializer + val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) - Await.result(postInstance(instance, configuration.instanceRegistryUri) map {response => - if(response.status == StatusCodes.OK || response.status == StatusCodes.Accepted) - { + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => + if(response.status == StatusCodes.OK){ log.info("Successfully registered at Instance Registry.") true } @@ -41,21 +43,63 @@ object InstanceRegistry extends JsonSupport with AppLogging } def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { - if(!configuration.usingInstanceRegistry) Failure - //TODO: Call generated API here - Failure(new Exception()) + if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) + else { + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=Crawler") + + Await.result(Http(system).singleRequest(request) map {response => + val status = response.status + if(status == StatusCodes.OK) { + + Await.result(Unmarshal(response.entity).to[Instance] map {instance => + val elasticIP = instance.iP + log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") + Success(elasticIP.get) + } recover {case ex => + log.warning(s"Failed to read response from Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + } + else{ + log.warning(s"Failed to read response from Instance Registry, server returned $status") + Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status")) + } + } recover { case ex => + log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ") + Failure(ex) + }, Duration.Inf) + } } def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { if(!configuration.usingInstanceRegistry) Failure - //TODO: Call generated API here - Success() + //TODO: Use ID! + val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) + + + Await.result(postInstance(instance, configuration.instanceRegistryUri + "/matchingResult?MatchingSuccessful=" + isElasticSearchReachable.toString) map {response => + if(response.status == StatusCodes.OK){ + log.info("Successfully posted matching result to Instance Registry.") + Success() + } + else { + val statuscode = response.status + log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode") + Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode")) + } + + } recover {case ex => + log.warning(s"Failed to post matching result tot Instance Registry, exception: $ex") + Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) + }, Duration.Inf) } - def postInstance(instance : Instance, uri: String) (implicit system: ActorSystem, ec : ExecutionContext) : Future[HttpResponse] = + def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = Marshal(instance).to[RequestEntity] flatMap { entity => val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) Http(system).singleRequest(request) } + + private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = Instance(id, Option(InetAddress.getLocalHost().getHostAddress()), Option(controlPort), Option(name), Option(ComponentType.Crawler)) } From bf368e8c3d5e7d40e59d37d26c9f2c0e05c0684f Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sat, 1 Sep 2018 13:34:31 +0200 Subject: [PATCH 07/20] Cleaned up unused dependencies, fixed some minor bugs --- build.sbt | 7 +++---- .../scala/de/upb/cs/swt/delphi/crawler/Configuration.scala | 2 +- .../crawler/instancemanagement/InstanceRegistry.scala | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index 032feff..e51c4bf 100644 --- a/build.sbt +++ b/build.sbt @@ -33,12 +33,11 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-http" % "10.1.3", - "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8", - "org.json4s" %% "json4s-jackson" % "3.5.3", - "io.swagger" % "swagger-core" % "1.5.15", - "io.spray" % "spray-client" % "1.3.1" + "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8" ) +libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.5.3" + libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime val elastic4sVersion = "6.3.0" diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index e5e708d..096bad2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -22,7 +22,7 @@ class Configuration { val limit : Int = 50 val instanceName = "MyCrawlerInstance" - val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:9500") + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8085") lazy val usingInstanceRegistry = InstanceRegistry.register(this) case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index d14af6c..733b3e5 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -72,7 +72,7 @@ object InstanceRegistry extends JsonSupport with AppLogging } def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { - if(!configuration.usingInstanceRegistry) Failure + if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) //TODO: Use ID! val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) @@ -89,7 +89,7 @@ object InstanceRegistry extends JsonSupport with AppLogging } } recover {case ex => - log.warning(s"Failed to post matching result tot Instance Registry, exception: $ex") + log.warning(s"Failed to post matching result to Instance Registry, exception: $ex") Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex")) }, Duration.Inf) } From ba109f650466f626ab4e29083eaf476ca2ab39e4 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sat, 1 Sep 2018 16:44:48 +0200 Subject: [PATCH 08/20] Crawler now reads its assigned IP after registration at the Instance Registry. --- .../instancemanagement/InstanceRegistry.scala | 19 ++++++++++++++----- .../io/swagger/client/model/Instance.scala | 2 ++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 733b3e5..0614f61 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -27,8 +27,14 @@ object InstanceRegistry extends JsonSupport with AppLogging Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => if(response.status == StatusCodes.OK){ - log.info("Successfully registered at Instance Registry.") - true + Await.result(Unmarshal(response.entity).to[String] map { assignedID => + val id = assignedID.toLong + log.info(s"Successfully registered at Instance Registry, got ID $id.") + true + } recover { case ex => + log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex") + false + }, Duration.Inf) } else { val statuscode = response.status @@ -45,7 +51,7 @@ object InstanceRegistry extends JsonSupport with AppLogging def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) else { - val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=Crawler") + val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") Await.result(Http(system).singleRequest(request) map {response => val status = response.status @@ -53,8 +59,11 @@ object InstanceRegistry extends JsonSupport with AppLogging Await.result(Unmarshal(response.entity).to[Instance] map {instance => val elasticIP = instance.iP - log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") - Success(elasticIP.get) + log.info(s"Instance Registry assigned ElasticSearch instance at ${elasticIP.getOrElse("None")}") + elasticIP match { + case Some(ip) => Success(ip + ":" + instance.portnumber.get) + case None => Failure(new RuntimeException("Response from Instance Registry did not contain an IP")) + } } recover {case ex => log.warning(s"Failed to read response from Instance Registry, exception: $ex") Failure(ex) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index 17c8f3b..a46d67c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -19,6 +19,7 @@ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { case "WebApi" => InstanceEnums.ComponentType.WebApi case "WebApp" => InstanceEnums.ComponentType.WebApp case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement + case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch case x => throw new RuntimeException(s"Unexpected string value $x for component type.") } case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.") @@ -44,6 +45,7 @@ object InstanceEnums { val WebApi = Value("WebApi") val WebApp = Value("WebApp") val DelphiManagement = Value("DelphiManagement") + val ElasticSearch = Value("ElasticSearch") } } From c67ce574cb255b2ae40ce2d05580ce24b47a86f0 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 2 Sep 2018 22:51:32 +0200 Subject: [PATCH 09/20] Made posting matching-result work by storing the matched ElasticSearch instance in the configuration. --- .../cs/swt/delphi/crawler/Configuration.scala | 16 +++++++++++++--- .../instancemanagement/InstanceRegistry.scala | 16 +++++++--------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 096bad2..5a4ae39 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -5,17 +5,27 @@ import java.net.URI import akka.stream.ThrottleMode import com.sksamuel.elastic4s.ElasticsearchClientUri import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance +import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType import scala.concurrent.duration._ import scala.util.{Failure, Success} class Configuration { - lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(InstanceRegistry.retrieveElasticSearchInstance(this) match { - case Success(elasticIP) => elasticIP - case Failure(_) => sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200") + lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri({ + if(elasticsearchInstance.portnumber.isEmpty){ + elasticsearchInstance.iP.get + }else{ + elasticsearchInstance.iP.get + ":" + elasticsearchInstance.portnumber.get + } }) + lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { + case Success(instance) => instance + case Failure(_) => Instance(None, Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")), None, Some("Default ElasticSearch instance"), Some(ComponentType.ElasticSearch) ) + } + val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 val throttle : Throttle = Throttle(10, 10 millis, 10, ThrottleMode.shaping) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 0614f61..db68590 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -48,7 +48,7 @@ object InstanceRegistry extends JsonSupport with AppLogging }, Duration.Inf) } - def retrieveElasticSearchInstance(configuration: Configuration) : Try[String] = { + def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = { if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) else { val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") @@ -60,10 +60,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Await.result(Unmarshal(response.entity).to[Instance] map {instance => val elasticIP = instance.iP log.info(s"Instance Registry assigned ElasticSearch instance at ${elasticIP.getOrElse("None")}") - elasticIP match { - case Some(ip) => Success(ip + ":" + instance.portnumber.get) - case None => Failure(new RuntimeException("Response from Instance Registry did not contain an IP")) - } + Success(instance) } recover {case ex => log.warning(s"Failed to read response from Instance Registry, exception: $ex") Failure(ex) @@ -81,12 +78,13 @@ object InstanceRegistry extends JsonSupport with AppLogging } def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = { - if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available.")) - //TODO: Use ID! - val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) + if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) + if(configuration.elasticsearchInstance.iD.isEmpty) Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) + val IdToPost = configuration.elasticsearchInstance.iD.get + val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/matchingResult?Id=$IdToPost&MatchingSuccessful=$isElasticSearchReachable") - Await.result(postInstance(instance, configuration.instanceRegistryUri + "/matchingResult?MatchingSuccessful=" + isElasticSearchReachable.toString) map {response => + Await.result(Http(system).singleRequest(request) map {response => if(response.status == StatusCodes.OK){ log.info("Successfully posted matching result to Instance Registry.") Success() From 51fe0b1d418dee0d0d12bc8aa5a66da4980acc4b Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 4 Sep 2018 19:08:16 +0200 Subject: [PATCH 10/20] Crawler is now deregistering itself from the Instance Registry on shutdown / exception during preflight checks. --- .../cs/swt/delphi/crawler/Configuration.scala | 11 ++++++- .../upb/cs/swt/delphi/crawler/Crawler.scala | 3 ++ .../instancemanagement/InstanceRegistry.scala | 33 ++++++++++++++++--- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 5a4ae39..c07765a 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -33,7 +33,16 @@ class Configuration { val instanceName = "MyCrawlerInstance" val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8085") - lazy val usingInstanceRegistry = InstanceRegistry.register(this) + + lazy val usingInstanceRegistry = assignedID match { + case Some(_) => true + case _ => false + } + + lazy val assignedID = InstanceRegistry.register(this) match { + case Success(id) => Some(id) + case Failure(_) => None + } case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala index 63efdf0..3768667 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala @@ -6,6 +6,7 @@ import com.sksamuel.elastic4s.http.HttpClient import de.upb.cs.swt.delphi.crawler.control.Server import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor.Start +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.preprocessing.PreprocessingDispatchActor import de.upb.cs.swt.delphi.crawler.processing.ProcessingDispatchActor import de.upb.cs.swt.delphi.crawler.storage.ElasticActor @@ -29,6 +30,7 @@ object Crawler extends App with AppLogging { sys.addShutdownHook(() => { log.warning("Received shutdown signal.") + InstanceRegistry.deregister(configuration) val future = system.terminate() Await.result(future, 120.seconds) }) @@ -38,6 +40,7 @@ object Crawler extends App with AppLogging { Startup.preflightCheck(configuration) match { case Success(c) => case Failure(e) => { + InstanceRegistry.deregister(configuration) system.terminate() sys.exit(1) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index db68590..1e72848 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -21,8 +21,8 @@ object InstanceRegistry extends JsonSupport with AppLogging implicit val ec = system.dispatcher implicit val materializer = Crawler.materializer - def register(configuration: Configuration) : Boolean = { + def register(configuration: Configuration) : Try[Long] = { val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName) Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response => @@ -30,21 +30,21 @@ object InstanceRegistry extends JsonSupport with AppLogging Await.result(Unmarshal(response.entity).to[String] map { assignedID => val id = assignedID.toLong log.info(s"Successfully registered at Instance Registry, got ID $id.") - true + Success(id) } recover { case ex => log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex") - false + Failure(ex) }, Duration.Inf) } else { val statuscode = response.status log.warning(s"Failed to register at Instance Registry, server returned $statuscode") - false + Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode")) } } recover {case ex => log.warning(s"Failed to register at Instance Registry, exception: $ex") - false + Failure(ex) }, Duration.Inf) } @@ -101,6 +101,29 @@ object InstanceRegistry extends JsonSupport with AppLogging }, Duration.Inf) } + def deregister(configuration: Configuration) : Try[Unit] = { + if(!configuration.usingInstanceRegistry) Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) + val id : Long = configuration.assignedID.get + + val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") + + Await.result(Http(system).singleRequest(request) map {response => + if(response.status == StatusCodes.OK){ + log.info("Successfully deregistered Instance Registry.") + Success() + } + else { + val statuscode = response.status + log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode") + Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode")) + } + + } recover {case ex => + log.warning(s"Failed to deregister to Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + } + def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = Marshal(instance).to[RequestEntity] flatMap { entity => val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity) From 0d6b1a9a979bdba3e5021d586229fda188960cc8 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 5 Sep 2018 12:56:15 +0200 Subject: [PATCH 11/20] Code style cleanup --- .../cs/swt/delphi/crawler/Configuration.scala | 11 ++++++++--- .../instancemanagement/InstanceRegistry.scala | 17 +++++++++++------ .../io/swagger/client/model/Instance.scala | 2 +- .../ElasticReachablePreflightCheck.scala | 8 ++++---- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 02e88f2..5e2c746 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -39,7 +39,12 @@ class Configuration { lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { case Success(instance) => instance - case Failure(_) => Instance(None, Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")), None, Some("Default ElasticSearch instance"), Some(ComponentType.ElasticSearch) ) + case Failure(_) => Instance( + None, + Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")), + None, + Some("Default ElasticSearch instance"), + Some(ComponentType.ElasticSearch)) } val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" @@ -55,12 +60,12 @@ class Configuration { val instanceName = "MyCrawlerInstance" val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8085") - lazy val usingInstanceRegistry = assignedID match { + lazy val usingInstanceRegistry : Boolean = assignedID match { case Some(_) => true case None => false } - lazy val assignedID = InstanceRegistry.register(this) match { + lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match { case Success(id) => Some(id) case Failure(_) => None } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 9ae6d66..7cc8c50 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -18,24 +18,26 @@ package de.upb.cs.swt.delphi.crawler.instancemanagement import java.net.InetAddress +import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler} import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, JsonSupport} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success, Try} object InstanceRegistry extends JsonSupport with AppLogging { - implicit val system = Crawler.system - implicit val ec = system.dispatcher - implicit val materializer = Crawler.materializer + implicit val system : ActorSystem = Crawler.system + implicit val ec : ExecutionContext = system.dispatcher + implicit val materializer : ActorMaterializer = Crawler.materializer def register(configuration: Configuration) : Try[Long] = { @@ -102,7 +104,9 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { val IdToPost = configuration.elasticsearchInstance.iD.get - val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/matchingResult?Id=$IdToPost&MatchingSuccessful=$isElasticSearchReachable") + val request = HttpRequest( + method = HttpMethods.POST, + configuration.instanceRegistryUri + s"/matchingResult?Id=$IdToPost&MatchingSuccessful=$isElasticSearchReachable") Await.result(Http(system).singleRequest(request) map {response => if(response.status == StatusCodes.OK){ @@ -157,5 +161,6 @@ object InstanceRegistry extends JsonSupport with AppLogging } - private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = Instance(id, Option(InetAddress.getLocalHost().getHostAddress()), Option(controlPort), Option(name), Option(ComponentType.Crawler)) + private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = + Instance(id, Option(InetAddress.getLocalHost.getHostAddress), Option(controlPort), Option(name), Option(ComponentType.Crawler)) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index a46d67c..092663b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -11,7 +11,7 @@ import spray.json._ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] { - def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString()) + def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString) def read(value: JsValue) = value match { case JsString(s) => s match { diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala index f54b770..8e4a128 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala @@ -46,13 +46,13 @@ object ElasticReachablePreflightCheck extends PreflightCheck { val f = (client.execute { nodeInfo() } map { i => { - if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(true, configuration) + if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration) Success(configuration) } - } recover { case e => { - if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(false, configuration) + } recover { case e => + if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration) Failure(e) - } + }).andThen { case _ => client.close() } From 18a71a0ea7fe72d39cf061d05f24e8337340d82d Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 7 Sep 2018 09:07:30 +0200 Subject: [PATCH 12/20] Restored setting of Hermes config, fails on Linux and Windows --- src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala index a5fb8da..2298f64 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala @@ -43,7 +43,7 @@ object Crawler extends App with AppLogging { implicit val materializer = ActorMaterializer() OPALLogger.updateLogger(GlobalLogContext, OPALLogAdapter) - //HermesAnalyzer.setConfig() + HermesAnalyzer.setConfig() sys.addShutdownHook(() => { log.warning("Received shutdown signal.") From cc45162e138b1992e077e51636c8c5897aeb4b5e Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 7 Sep 2018 09:32:05 +0200 Subject: [PATCH 13/20] CodeStyle: Replaced .get with .getOrElse --- .../de/upb/cs/swt/delphi/crawler/Configuration.scala | 9 +++++---- .../crawler/instancemanagement/InstanceRegistry.scala | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 5e2c746..86ce6e2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -31,9 +31,9 @@ class Configuration { lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri({ if(elasticsearchInstance.portnumber.isEmpty){ - elasticsearchInstance.iP.get + elasticsearchInstance.iP.getOrElse("elasticsearch://localhost:" + defaultElasticSearchPort) }else{ - elasticsearchInstance.iP.get + ":" + elasticsearchInstance.portnumber.get + elasticsearchInstance.iP.getOrElse("elasticsearch://localhost") + ":" + elasticsearchInstance.portnumber.getOrElse(defaultElasticSearchPort) } }) @@ -41,14 +41,15 @@ class Configuration { case Success(instance) => instance case Failure(_) => Instance( None, - Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200")), - None, + Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost")), + Some(defaultElasticSearchPort), Some("Default ElasticSearch instance"), Some(ComponentType.ElasticSearch)) } val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 + val defaultElasticSearchPort : Int = 9200 val limit : Int = 50 val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 7cc8c50..d85ec81 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -103,10 +103,10 @@ object InstanceRegistry extends JsonSupport with AppLogging if(configuration.elasticsearchInstance.iD.isEmpty) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) } else { - val IdToPost = configuration.elasticsearchInstance.iD.get + val idToPost = configuration.elasticsearchInstance.iD.getOrElse(-1L) val request = HttpRequest( method = HttpMethods.POST, - configuration.instanceRegistryUri + s"/matchingResult?Id=$IdToPost&MatchingSuccessful=$isElasticSearchReachable") + configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable") Await.result(Http(system).singleRequest(request) map {response => if(response.status == StatusCodes.OK){ @@ -132,7 +132,7 @@ object InstanceRegistry extends JsonSupport with AppLogging if(!configuration.usingInstanceRegistry){ Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available.")) } else { - val id : Long = configuration.assignedID.get + val id : Long = configuration.assignedID.getOrElse(-1L) val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id") From 81e8482c6e980ef9d17ef0a0fee62cd4e81e7d01 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sat, 8 Sep 2018 12:00:46 +0200 Subject: [PATCH 14/20] Moved default host to val, removed unused resolver/unnecessary condition --- build.sbt | 1 - .../de/upb/cs/swt/delphi/crawler/Configuration.scala | 9 +++++---- .../crawler/storage/ElasticReachablePreflightCheck.scala | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index bc955a8..0070144 100644 --- a/build.sbt +++ b/build.sbt @@ -73,7 +73,6 @@ libraryDependencies ++= Seq( ) resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" -resolvers ++= Seq(Resolver.mavenLocal) val opalVersion = "1.0.0" libraryDependencies ++= Seq( diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index 86ce6e2..f82bc2b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -31,9 +31,9 @@ class Configuration { lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri({ if(elasticsearchInstance.portnumber.isEmpty){ - elasticsearchInstance.iP.getOrElse("elasticsearch://localhost:" + defaultElasticSearchPort) + elasticsearchInstance.iP.getOrElse(defaultElasticSearchHost + ":" + defaultElasticSearchPort) }else{ - elasticsearchInstance.iP.getOrElse("elasticsearch://localhost") + ":" + elasticsearchInstance.portnumber.getOrElse(defaultElasticSearchPort) + elasticsearchInstance.iP.getOrElse(defaultElasticSearchHost) + ":" + elasticsearchInstance.portnumber.getOrElse(defaultElasticSearchPort) } }) @@ -41,8 +41,8 @@ class Configuration { case Success(instance) => instance case Failure(_) => Instance( None, - Some(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost")), - Some(defaultElasticSearchPort), + Some(sys.env.getOrElse("DELPHI_ELASTIC_URI",defaultElasticSearchHost + ":" + defaultElasticSearchPort)), + None, Some("Default ElasticSearch instance"), Some(ComponentType.ElasticSearch)) } @@ -50,6 +50,7 @@ class Configuration { val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 val defaultElasticSearchPort : Int = 9200 + val defaultElasticSearchHost : String = "elasticsearch://localhost" val limit : Int = 50 val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala index 8e4a128..84db188 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticReachablePreflightCheck.scala @@ -46,11 +46,11 @@ object ElasticReachablePreflightCheck extends PreflightCheck { val f = (client.execute { nodeInfo() } map { i => { - if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration) + InstanceRegistry.sendMatchingResult(isElasticSearchReachable = true, configuration) Success(configuration) } } recover { case e => - if(configuration.usingInstanceRegistry) InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration) + InstanceRegistry.sendMatchingResult(isElasticSearchReachable = false, configuration) Failure(e) }).andThen { From 10fb5bccda8598e5a2b30e96a06075fc83d7326f Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 9 Sep 2018 14:34:20 +0200 Subject: [PATCH 15/20] Made class 'Instance' not use Options anymore --- .../cs/swt/delphi/crawler/Configuration.scala | 43 +++++++++++++------ .../instancemanagement/InstanceRegistry.scala | 6 +-- .../io/swagger/client/model/Instance.scala | 8 ++-- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index f82bc2b..d37868b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -25,32 +25,51 @@ import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType import scala.concurrent.duration._ -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class Configuration { - lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri({ - if(elasticsearchInstance.portnumber.isEmpty){ - elasticsearchInstance.iP.getOrElse(defaultElasticSearchHost + ":" + defaultElasticSearchPort) - }else{ - elasticsearchInstance.iP.getOrElse(defaultElasticSearchHost) + ":" + elasticsearchInstance.portnumber.getOrElse(defaultElasticSearchPort) - } - }) + lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( + elasticsearchInstance.host + ":" + elasticsearchInstance.portnumber) lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { case Success(instance) => instance case Failure(_) => Instance( None, - Some(sys.env.getOrElse("DELPHI_ELASTIC_URI",defaultElasticSearchHost + ":" + defaultElasticSearchPort)), - None, - Some("Default ElasticSearch instance"), - Some(ComponentType.ElasticSearch)) + fallbackElasticSearchHost, + fallbackElasticSearchPort, + "Default ElasticSearch instance", + ComponentType.ElasticSearch) } val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/" val controlServerPort : Int = 8882 + val defaultElasticSearchPort : Int = 9200 val defaultElasticSearchHost : String = "elasticsearch://localhost" + + lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match { + case Some(hostString) => if(hostString.count(c => c == ':') == 3){ + Try(hostString.split(":")(2).toInt) match { + case Success(port) => port + case Failure(_) => defaultElasticSearchPort + } + } else { + defaultElasticSearchPort + } + case None => defaultElasticSearchPort + } + + lazy val fallbackElasticSearchHost : String = sys.env.get("DELPHI_ELASTIC_URI") match { + case Some(hostString) => + if(hostString.count(c => c == ':') == 2){ + hostString.substring(0,hostString.lastIndexOf(":")) + } else { + defaultElasticSearchHost + } + case None => defaultElasticSearchHost + + } val limit : Int = 50 val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index d85ec81..9697bf7 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -77,8 +77,8 @@ object InstanceRegistry extends JsonSupport with AppLogging if(status == StatusCodes.OK) { Await.result(Unmarshal(response.entity).to[Instance] map {instance => - val elasticIP = instance.iP - log.info(s"Instance Registry assigned ElasticSearch instance at ${elasticIP.getOrElse("None")}") + val elasticIP = instance.host + log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") Success(instance) } recover {case ex => log.warning(s"Failed to read response from Instance Registry, exception: $ex") @@ -162,5 +162,5 @@ object InstanceRegistry extends JsonSupport with AppLogging private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance = - Instance(id, Option(InetAddress.getLocalHost.getHostAddress), Option(controlPort), Option(name), Option(ComponentType.Crawler)) + Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.Crawler) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index 092663b..aef8943 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -30,11 +30,11 @@ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { final case class Instance ( iD: Option[Long], - iP: Option[String], - portnumber: Option[Long], - name: Option[String], + host: String, + portnumber: Int, + name: String, /* Component Type */ - componentType: Option[InstanceEnums.ComponentType] + componentType: InstanceEnums.ComponentType ) object InstanceEnums { From 1e8ec5ee1d0e86584eeae70c63fd596145de8134 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 9 Sep 2018 15:10:29 +0200 Subject: [PATCH 16/20] Fixed shutdown hook not being triggered, fixed port of IR --- .../scala/de/upb/cs/swt/delphi/crawler/Configuration.scala | 2 +- src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index d37868b..b8bc734 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -79,7 +79,7 @@ class Configuration { val callGraphStreamPoolSize : Int = 4 val instanceName = "MyCrawlerInstance" - val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8085") + val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8087") lazy val usingInstanceRegistry : Boolean = assignedID match { case Some(_) => true diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala index 2298f64..322fe0b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala @@ -43,9 +43,9 @@ object Crawler extends App with AppLogging { implicit val materializer = ActorMaterializer() OPALLogger.updateLogger(GlobalLogContext, OPALLogAdapter) - HermesAnalyzer.setConfig() + //HermesAnalyzer.setConfig() - sys.addShutdownHook(() => { + sys.addShutdownHook({ log.warning("Received shutdown signal.") InstanceRegistry.deregister(configuration) val future = system.terminate() From b2458f04c0479ab3c04bb546e54a5099da9ca5d8 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 12 Sep 2018 11:42:02 +0200 Subject: [PATCH 17/20] Better handling of getMatchingInstance returning 404 --- .../instancemanagement/InstanceRegistry.scala | 12 +++++++----- .../crawler/storage/ElasticIndexPreflightCheck.scala | 6 +++++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 9697bf7..227826f 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -84,10 +84,12 @@ object InstanceRegistry extends JsonSupport with AppLogging log.warning(s"Failed to read response from Instance Registry, exception: $ex") Failure(ex) }, Duration.Inf) - } - else{ - log.warning(s"Failed to read response from Instance Registry, server returned $status") - Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status")) + } else if(status == StatusCodes.NotFound) { + log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.") + Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned $status")) + } else { + log.warning(s"Failed to read matching instance from Instance Registry, server returned $status") + Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status")) } } recover { case ex => log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ") @@ -101,7 +103,7 @@ object InstanceRegistry extends JsonSupport with AppLogging Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) } else { if(configuration.elasticsearchInstance.iD.isEmpty) { - Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID.")) + Failure(new RuntimeException("The ElasticSearch instance was not assigned by the Instance Registry, so no matching result will be posted.")) } else { val idToPost = configuration.elasticsearchInstance.iD.getOrElse(-1L) val request = HttpRequest( diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala index 5d168aa..2c8a841 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala @@ -20,6 +20,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess} +import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck} import scala.concurrent.duration.Duration @@ -46,7 +47,10 @@ object ElasticIndexPreflightCheck extends PreflightCheck with ElasticIndexMainte case false => migrateIndex(configuration) // This needs some work } } - case RequestFailure(_, _, _, e) => Failure(new ElasticException(e)) + case RequestFailure(_, _, _, e) => { + InstanceRegistry.sendMatchingResult(false, configuration) + Failure(new ElasticException(e)) + } } } } From c2d86be22916e6ba3132fdd73a95421ab5bff77d Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 12 Sep 2018 14:46:37 +0200 Subject: [PATCH 18/20] Adapted IR communication to use new attribute names --- .../scala/de/upb/cs/swt/delphi/crawler/Configuration.scala | 4 ++-- .../delphi/crawler/instancemanagement/InstanceRegistry.scala | 4 ++-- .../swt/delphi/crawler/io/swagger/client/model/Instance.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala index b8bc734..978dc9c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala @@ -30,7 +30,7 @@ import scala.util.{Failure, Success, Try} class Configuration { lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( - elasticsearchInstance.host + ":" + elasticsearchInstance.portnumber) + elasticsearchInstance.host + ":" + elasticsearchInstance.portNumber) lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match { case Success(instance) => instance @@ -49,7 +49,7 @@ class Configuration { val defaultElasticSearchHost : String = "elasticsearch://localhost" lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match { - case Some(hostString) => if(hostString.count(c => c == ':') == 3){ + case Some(hostString) => if(hostString.count(c => c == ':') == 2){ Try(hostString.split(":")(2).toInt) match { case Success(port) => port case Failure(_) => defaultElasticSearchPort diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index 227826f..b50d296 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -102,10 +102,10 @@ object InstanceRegistry extends JsonSupport with AppLogging if(!configuration.usingInstanceRegistry) { Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available.")) } else { - if(configuration.elasticsearchInstance.iD.isEmpty) { + if(configuration.elasticsearchInstance.id.isEmpty) { Failure(new RuntimeException("The ElasticSearch instance was not assigned by the Instance Registry, so no matching result will be posted.")) } else { - val idToPost = configuration.elasticsearchInstance.iD.getOrElse(-1L) + val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L) val request = HttpRequest( method = HttpMethods.POST, configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable") diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala index aef8943..252e4d2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/io/swagger/client/model/Instance.scala @@ -29,9 +29,9 @@ trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { } final case class Instance ( - iD: Option[Long], + id: Option[Long], host: String, - portnumber: Int, + portNumber: Int, name: String, /* Component Type */ componentType: InstanceEnums.ComponentType From 3372a897f97bf7bb524665f3e0d4cd208fe50541 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 14 Sep 2018 14:16:43 +0200 Subject: [PATCH 19/20] CodeStyle: Replaced if-else if with match-case --- .../instancemanagement/InstanceRegistry.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala index b50d296..1d532af 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala @@ -73,23 +73,23 @@ object InstanceRegistry extends JsonSupport with AppLogging val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch") Await.result(Http(system).singleRequest(request) map {response => - val status = response.status - if(status == StatusCodes.OK) { - - Await.result(Unmarshal(response.entity).to[Instance] map {instance => - val elasticIP = instance.host - log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") - Success(instance) - } recover {case ex => - log.warning(s"Failed to read response from Instance Registry, exception: $ex") - Failure(ex) - }, Duration.Inf) - } else if(status == StatusCodes.NotFound) { - log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.") - Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned $status")) - } else { - log.warning(s"Failed to read matching instance from Instance Registry, server returned $status") - Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status")) + response.status match { + case StatusCodes.OK => + Await.result(Unmarshal(response.entity).to[Instance] map {instance => + val elasticIP = instance.host + log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP") + Success(instance) + } recover {case ex => + log.warning(s"Failed to read response from Instance Registry, exception: $ex") + Failure(ex) + }, Duration.Inf) + case StatusCodes.NotFound => + log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.") + Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}")) + case _ => + val status = response.status + log.warning(s"Failed to read matching instance from Instance Registry, server returned $status") + Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status")) } } recover { case ex => log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ") From 44462afda9e83748301262017fc0420a487b56a6 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 19 Sep 2018 11:19:15 +0200 Subject: [PATCH 20/20] Fixed merge error (missing comma) --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index c36f397..af4a00c 100644 --- a/build.sbt +++ b/build.sbt @@ -49,7 +49,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, - "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8" + "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8", "com.typesafe.akka" %% "akka-http" % "10.1.5" )