Skip to content

Commit a9f8b2e

Browse files
committed
Changes for Instance Registry
1 parent c2488e0 commit a9f8b2e

File tree

1 file changed

+151
-0
lines changed

1 file changed

+151
-0
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package de.upb.cs.swt.delphi.instancemanagement
2+
3+
import java.net.InetAddress
4+
5+
import akka.actor.ActorSystem
6+
import akka.http.scaladsl.Http
7+
import akka.http.scaladsl.marshalling.Marshal
8+
import akka.http.scaladsl.model._
9+
import akka.http.scaladsl.unmarshalling.Unmarshal
10+
import akka.stream.ActorMaterializer
11+
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
12+
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}
13+
14+
import scala.concurrent.{Await, ExecutionContext, Future}
15+
import scala.concurrent.duration.Duration
16+
import scala.util.{Failure, Success, Try}
17+
18+
object InstanceRegistry extends JsonSupport with AppLogging
19+
{
20+
21+
implicit val system : ActorSystem = Server.system
22+
implicit val ec : ExecutionContext = system.dispatcher
23+
implicit val materializer : ActorMaterializer = Server.materializer
24+
25+
26+
def register(configuration: Configuration) : Try[Long] = {
27+
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)
28+
29+
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
30+
if(response.status == StatusCodes.OK){
31+
Await.result(Unmarshal(response.entity).to[String] map { assignedID =>
32+
val id = assignedID.toLong
33+
log.info(s"Successfully registered at Instance Registry, got ID $id.")
34+
Success(id)
35+
} recover { case ex =>
36+
log.warning(s"Failed to read assigned ID from Instance Registry, exception: $ex")
37+
Failure(ex)
38+
}, Duration.Inf)
39+
}
40+
else {
41+
val statuscode = response.status
42+
log.warning(s"Failed to register at Instance Registry, server returned $statuscode")
43+
Failure(new RuntimeException(s"Failed to register at Instance Registry, server returned $statuscode"))
44+
}
45+
46+
} recover {case ex =>
47+
log.warning(s"Failed to register at Instance Registry, exception: $ex")
48+
Failure(ex)
49+
}, Duration.Inf)
50+
}
51+
52+
def retrieveElasticSearchInstance(configuration: Configuration) : Try[Instance] = {
53+
if(!configuration.usingInstanceRegistry) {
54+
Failure(new RuntimeException("Cannot get ElasticSearch instance from Instance Registry, no Instance Registry available."))
55+
} else {
56+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")
57+
58+
Await.result(Http(system).singleRequest(request) map {response =>
59+
val status = response.status
60+
if(status == StatusCodes.OK) {
61+
62+
Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
63+
val elasticIP = instance.host
64+
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ")
65+
Success(instance)
66+
} recover {case ex =>
67+
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
68+
Failure(ex)
69+
}, Duration.Inf)
70+
}
71+
else{
72+
log.warning(s"Failed to read response from Instance Registry, server returned $status")
73+
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
74+
}
75+
} recover { case ex =>
76+
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
77+
Failure(ex)
78+
}, Duration.Inf)
79+
}
80+
}
81+
82+
def sendMatchingResult(isElasticSearchReachable : Boolean, configuration: Configuration) : Try[Unit] = {
83+
84+
if(!configuration.usingInstanceRegistry) {
85+
Failure(new RuntimeException("Cannot post matching result to Instance Registry, no Instance Registry available."))
86+
} else {
87+
if(configuration.elasticsearchInstance.id.isEmpty) {
88+
Failure(new RuntimeException("Cannot post matching result to Instance Registry, assigned ElasticSearch instance has no ID."))
89+
} else {
90+
val idToPost = configuration.elasticsearchInstance.id.getOrElse(-1L)
91+
val request = HttpRequest(
92+
method = HttpMethods.POST,
93+
configuration.instanceRegistryUri + s"/matchingResult?Id=$idToPost&MatchingSuccessful=$isElasticSearchReachable")
94+
95+
Await.result(Http(system).singleRequest(request) map {response =>
96+
val status=response.status
97+
if(response.status == StatusCodes.OK){
98+
log.info(s"Successfully posted matching result to Instance Registry.")
99+
Success()
100+
}
101+
else {
102+
val statuscode = response.status
103+
log.warning(s"Failed to post matching result to Instance Registry, server returned $statuscode")
104+
Failure(new RuntimeException(s"Failed to post matching result to Instance Registry, server returned $statuscode"))
105+
}
106+
107+
} recover {case ex =>
108+
log.warning(s"Failed to post matching result to Instance Registry, exception: $ex")
109+
Failure(new RuntimeException(s"Failed to post matching result tot Instance Registry, exception: $ex"))
110+
}, Duration.Inf)
111+
}
112+
}
113+
114+
}
115+
116+
def deregister(configuration: Configuration) : Try[Unit] = {
117+
if(!configuration.usingInstanceRegistry){
118+
Failure(new RuntimeException("Cannot deregister from Instance Registry, no Instance Registry available."))
119+
} else {
120+
val id : Long = configuration.assignedID.getOrElse(-1L)
121+
122+
val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id")
123+
124+
Await.result(Http(system).singleRequest(request) map {response =>
125+
if(response.status == StatusCodes.OK){
126+
log.info("Successfully deregistered from Instance Registry.")
127+
Success()
128+
}
129+
else {
130+
val statuscode = response.status
131+
log.warning(s"Failed to deregister from Instance Registry, server returned $statuscode")
132+
Failure(new RuntimeException(s"Failed to deregister from Instance Registry, server returned $statuscode"))
133+
}
134+
135+
} recover {case ex =>
136+
log.warning(s"Failed to deregister to Instance Registry, exception: $ex")
137+
Failure(ex)
138+
}, Duration.Inf)
139+
}
140+
}
141+
142+
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
143+
Marshal(instance).to[RequestEntity] flatMap { entity =>
144+
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
145+
Http(system).singleRequest(request)
146+
}
147+
148+
149+
private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
150+
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi)
151+
}

0 commit comments

Comments
 (0)