Skip to content

Commit 9a2a5b6

Browse files
authored
Merge pull request #24 from delphi-hub/feature/instanceregistry
The Crawler now registers itself at the instance registry
2 parents 2cf291c + 44462af commit 9a2a5b6

File tree

8 files changed

+325
-8
lines changed

8 files changed

+325
-8
lines changed

build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ libraryDependencies ++= Seq(
4949
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
5050
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
5151
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
52+
"com.typesafe.akka" %% "akka-http-spray-json" % "10.0.8",
5253
"com.typesafe.akka" %% "akka-http" % "10.1.5"
5354
)
5455

56+
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.5.3"
57+
5558
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime
5659

5760
val elastic4sVersion = "6.3.0"

src/main/resources/reference.conf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
io.swagger.client {
2+
3+
apiRequest {
4+
5+
compression {
6+
enabled: false
7+
size-threshold: 0
8+
}
9+
10+
trust-certificates: true
11+
12+
connection-timeout: 5000ms
13+
14+
default-headers {
15+
"userAgent": "swagger-client_1.0.0"
16+
}
17+
18+
// let you define custom http status code, as in :
19+
// { code: 601, reason: "some custom http status code", success: false }
20+
custom-codes : []
21+
}
22+
}
23+
24+
spray.can.host-connector.max-redirects = 10

src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,56 @@ import java.net.URI
2020

2121
import akka.stream.ThrottleMode
2222
import com.sksamuel.elastic4s.ElasticsearchClientUri
23+
import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry
24+
import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.Instance
25+
import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType
2326

2427
import scala.concurrent.duration._
28+
import scala.util.{Failure, Success, Try}
29+
30+
class Configuration {
31+
32+
lazy val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
33+
elasticsearchInstance.host + ":" + elasticsearchInstance.portNumber)
34+
35+
lazy val elasticsearchInstance : Instance = InstanceRegistry.retrieveElasticSearchInstance(this) match {
36+
case Success(instance) => instance
37+
case Failure(_) => Instance(
38+
None,
39+
fallbackElasticSearchHost,
40+
fallbackElasticSearchPort,
41+
"Default ElasticSearch instance",
42+
ComponentType.ElasticSearch)
43+
}
2544

26-
class Configuration {
27-
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(sys.env.getOrElse("DELPHI_ELASTIC_URI","elasticsearch://localhost:9200"))
2845
val mavenRepoBase: URI = new URI("http://repo1.maven.org/maven2/") // TODO: Create a local demo server "http://localhost:8881/maven2/"
2946
val controlServerPort : Int = 8882
47+
48+
val defaultElasticSearchPort : Int = 9200
49+
val defaultElasticSearchHost : String = "elasticsearch://localhost"
50+
51+
lazy val fallbackElasticSearchPort : Int = sys.env.get("DELPHI_ELASTIC_URI") match {
52+
case Some(hostString) => if(hostString.count(c => c == ':') == 2){
53+
Try(hostString.split(":")(2).toInt) match {
54+
case Success(port) => port
55+
case Failure(_) => defaultElasticSearchPort
56+
}
57+
} else {
58+
defaultElasticSearchPort
59+
}
60+
case None => defaultElasticSearchPort
61+
}
62+
63+
lazy val fallbackElasticSearchHost : String = sys.env.get("DELPHI_ELASTIC_URI") match {
64+
case Some(hostString) =>
65+
if(hostString.count(c => c == ':') == 2){
66+
hostString.substring(0,hostString.lastIndexOf(":"))
67+
} else {
68+
defaultElasticSearchHost
69+
}
70+
case None => defaultElasticSearchHost
71+
72+
}
3073
val limit : Int = 50
3174
val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping)
3275

@@ -35,6 +78,19 @@ class Configuration {
3578
val elasticActorPoolSize : Int = 4
3679
val callGraphStreamPoolSize : Int = 4
3780

81+
val instanceName = "MyCrawlerInstance"
82+
val instanceRegistryUri : String = sys.env.getOrElse("DELPHI_IR_URI", "http://localhost:8087")
83+
84+
lazy val usingInstanceRegistry : Boolean = assignedID match {
85+
case Some(_) => true
86+
case None => false
87+
}
88+
89+
lazy val assignedID : Option[Long] = InstanceRegistry.register(this) match {
90+
case Success(id) => Some(id)
91+
case Failure(_) => None
92+
}
93+
3894
case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode)
3995
}
4096

src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.sksamuel.elastic4s.http.ElasticClient
2222
import de.upb.cs.swt.delphi.crawler.control.Server
2323
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor
2424
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor.Start
25+
import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry
2526
import de.upb.cs.swt.delphi.crawler.preprocessing.PreprocessingDispatchActor
2627
import de.upb.cs.swt.delphi.crawler.processing.{HermesActor, HermesAnalyzer, ProcessingDispatchActor}
2728
import de.upb.cs.swt.delphi.crawler.storage.ElasticActor
@@ -42,10 +43,11 @@ object Crawler extends App with AppLogging {
4243
implicit val materializer = ActorMaterializer()
4344

4445
OPALLogger.updateLogger(GlobalLogContext, OPALLogAdapter)
45-
HermesAnalyzer.setConfig()
46+
//HermesAnalyzer.setConfig()
4647

47-
sys.addShutdownHook(() => {
48+
sys.addShutdownHook({
4849
log.warning("Received shutdown signal.")
50+
InstanceRegistry.deregister(configuration)
4951
val future = system.terminate()
5052
Await.result(future, 120.seconds)
5153
})
@@ -55,6 +57,7 @@ object Crawler extends App with AppLogging {
5557
Startup.preflightCheck(configuration) match {
5658
case Success(c) =>
5759
case Failure(e) => {
60+
InstanceRegistry.deregister(configuration)
5861
system.terminate()
5962
sys.exit(1)
6063
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package de.upb.cs.swt.delphi.crawler.instancemanagement
18+
19+
import java.net.InetAddress
20+
21+
import akka.actor.ActorSystem
22+
import akka.http.scaladsl.Http
23+
import akka.http.scaladsl.marshalling.Marshal
24+
import akka.http.scaladsl.model._
25+
import akka.http.scaladsl.unmarshalling.Unmarshal
26+
import akka.stream.ActorMaterializer
27+
import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler}
28+
import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.InstanceEnums.ComponentType
29+
import de.upb.cs.swt.delphi.crawler.io.swagger.client.model.{Instance, JsonSupport}
30+
31+
import scala.concurrent.duration.Duration
32+
import scala.concurrent.{Await, ExecutionContext, Future}
33+
import scala.util.{Failure, Success, Try}
34+
35+
object InstanceRegistry extends JsonSupport with AppLogging
36+
{
37+
38+
implicit val system : ActorSystem = Crawler.system
39+
implicit val ec : ExecutionContext = system.dispatcher
40+
implicit val materializer : ActorMaterializer = Crawler.materializer
41+
42+
43+
def register(configuration: Configuration) : Try[Long] = {
44+
val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName)
45+
46+
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
47+
if(response.status == StatusCodes.OK){
48+
Await.result(Unmarshal(response.entity).to[String] map { assignedID =>
49+
val id = assignedID.toLong
50+
log.info(s"Successfully registered at Instance Registry, got ID $id.")
51+
Success(id)
52+
} recover { case ex =>
53+
log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex")
54+
Failure(ex)
55+
}, Duration.Inf)
56+
}
57+
else {
58+
val statuscode = response.status
59+
log.warning(s"Failed to register at Instance Registry, server returned $statuscode")
60+
Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode"))
61+
}
62+
63+
} recover {case ex =>
64+
log.warning(s"Failed to register at Instance Registry, exception: $ex")
65+
Failure(ex)
66+
}, Duration.Inf)
67+
}
68+
69+
def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = {
70+
if(!configuration.usingInstanceRegistry) {
71+
Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available."))
72+
} else {
73+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")
74+
75+
Await.result(Http(system).singleRequest(request) map {response =>
76+
response.status match {
77+
case StatusCodes.OK =>
78+
Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
79+
val elasticIP = instance.host
80+
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP")
81+
Success(instance)
82+
} recover {case ex =>
83+
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
84+
Failure(ex)
85+
}, Duration.Inf)
86+
case StatusCodes.NotFound =>
87+
log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.")
88+
Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}"))
89+
case _ =>
90+
val status = response.status
91+
log.warning(s"Failed to read matching instance from Instance Registry, server returned $status")
92+
Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status"))
93+
}
94+
} recover { case ex =>
95+
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
96+
Failure(ex)
97+
}, Duration.Inf)
98+
}
99+
}
100+
101+
def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = {
102+
if(!configuration.usingInstanceRegistry) {
103+
Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available."))
104+
} else {
105+
if(configuration.elasticsearchInstance.id.isEmpty) {
106+
Failure(new RuntimeException("The ElasticSearch instance was not assigned by the Instance Registry, so no matching result will be posted."))
107+
} else {
108+
val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L)
109+
val request = HttpRequest(
110+
method = HttpMethods.POST,
111+
configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable")
112+
113+
Await.result(Http(system).singleRequest(request) map {response =>
114+
if(response.status == StatusCodes.OK){
115+
log.info("Successfully posted matching result to Instance Registry.")
116+
Success()
117+
}
118+
else {
119+
val statuscode = response.status
120+
log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode")
121+
Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode"))
122+
}
123+
124+
} recover {case ex =>
125+
log.warning(s"Failed to post matching result to Instance Registry, exception: $ex")
126+
Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex"))
127+
}, Duration.Inf)
128+
}
129+
}
130+
131+
}
132+
133+
def deregister(configuration: Configuration) : Try[Unit] = {
134+
if(!configuration.usingInstanceRegistry){
135+
Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available."))
136+
} else {
137+
val id : Long = configuration.assignedID.getOrElse(-1L)
138+
139+
val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id")
140+
141+
Await.result(Http(system).singleRequest(request) map {response =>
142+
if(response.status == StatusCodes.OK){
143+
log.info("Successfully deregistered from Instance Registry.")
144+
Success()
145+
}
146+
else {
147+
val statuscode = response.status
148+
log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode")
149+
Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode"))
150+
}
151+
152+
} recover {case ex =>
153+
log.warning(s"Failed to deregister to Instance Registry, exception: $ex")
154+
Failure(ex)
155+
}, Duration.Inf)
156+
}
157+
}
158+
159+
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
160+
Marshal(instance).to[RequestEntity] flatMap { entity =>
161+
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
162+
Http(system).singleRequest(request)
163+
}
164+
165+
166+
private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
167+
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.Crawler)
168+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* NOTE: This class is auto generated by the akka-scala (beta) swagger code generator program.
3+
* https://github.com/swagger-api/swagger-codegen
4+
* For any issue or feedback, please open a ticket via https://github.com/swagger-api/swagger-codegen/issues/new
5+
*/
6+
7+
package de.upb.cs.swt.delphi.crawler.io.swagger.client.model
8+
9+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
10+
import spray.json._
11+
12+
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
13+
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {
14+
def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString)
15+
16+
def read(value: JsValue) = value match {
17+
case JsString(s) => s match {
18+
case "Crawler" => InstanceEnums.ComponentType.Crawler
19+
case "WebApi" => InstanceEnums.ComponentType.WebApi
20+
case "WebApp" => InstanceEnums.ComponentType.WebApp
21+
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
22+
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
23+
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
24+
}
25+
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
26+
}
27+
}
28+
implicit val instanceFormat = jsonFormat5(Instance)
29+
}
30+
31+
final case class Instance (
32+
id: Option[Long],
33+
host: String,
34+
portNumber: Int,
35+
name: String,
36+
/* Component Type */
37+
componentType: InstanceEnums.ComponentType
38+
)
39+
40+
object InstanceEnums {
41+
42+
type ComponentType = ComponentType.Value
43+
object ComponentType extends Enumeration {
44+
val Crawler = Value("Crawler")
45+
val WebApi = Value("WebApi")
46+
val WebApp = Value("WebApp")
47+
val DelphiManagement = Value("DelphiManagement")
48+
val ElasticSearch = Value("ElasticSearch")
49+
}
50+
51+
}
52+

src/main/scala/de/upb/cs/swt/delphi/crawler/storage/ElasticIndexPreflightCheck.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import akka.actor.ActorSystem
2020
import akka.http.scaladsl.model.StatusCodes
2121
import com.sksamuel.elastic4s.http.ElasticDsl._
2222
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess}
23+
import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceRegistry
2324
import de.upb.cs.swt.delphi.crawler.{Configuration, PreflightCheck}
2425

2526
import scala.concurrent.duration.Duration
@@ -46,7 +47,10 @@ object ElasticIndexPreflightCheck extends PreflightCheck with ElasticIndexMainte
4647
case false => migrateIndex(configuration) // This needs some work
4748
}
4849
}
49-
case RequestFailure(_, _, _, e) => Failure(new ElasticException(e))
50+
case RequestFailure(_, _, _, e) => {
51+
InstanceRegistry.sendMatchingResult(false, configuration)
52+
Failure(new ElasticException(e))
53+
}
5054
}
5155
}
5256
}

0 commit comments

Comments
 (0)