Skip to content

Commit 28c6187

Browse files
authored
Merge pull request #26 from delphi-hub/newInstanceRegistry
New instance registry
2 parents 1677b67 + 6ce7899 commit 28c6187

File tree

5 files changed

+289
-49
lines changed

5 files changed

+289
-49
lines changed

delphi-webapi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit ed901bf01811ac64126906ab10340da0f87fe0e8
Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,85 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
116
package de.upb.cs.swt.delphi.instancemanagement
2-
317
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
4-
import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat}
18+
import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat}
519

620
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
7-
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {
21+
22+
implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] {
23+
824
def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString)
925

10-
def read(value: JsValue) = value match {
26+
def read(value: JsValue) : InstanceEnums.ComponentType = value match {
1127
case JsString(s) => s match {
1228
case "Crawler" => InstanceEnums.ComponentType.Crawler
1329
case "WebApi" => InstanceEnums.ComponentType.WebApi
1430
case "WebApp" => InstanceEnums.ComponentType.WebApp
1531
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
1632
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
17-
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
33+
case x => throw DeserializationException(s"Unexpected string value $x for component type.")
1834
}
19-
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
35+
case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.")
2036
}
2137
}
22-
implicit val instanceFormat = jsonFormat5(Instance)
38+
39+
implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] {
40+
41+
def write(compType : InstanceEnums.State) = JsString(compType.toString)
42+
43+
def read(value: JsValue) : InstanceEnums.State = value match {
44+
case JsString(s) => s match {
45+
case "Running" => InstanceEnums.InstanceState.Running
46+
case "Stopped" => InstanceEnums.InstanceState.Stopped
47+
case "Failed" => InstanceEnums.InstanceState.Failed
48+
case "Paused" => InstanceEnums.InstanceState.Paused
49+
case "NotReachable" => InstanceEnums.InstanceState.NotReachable
50+
case x => throw DeserializationException(s"Unexpected string value $x for instance state.")
51+
}
52+
case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.")
53+
}
54+
}
55+
56+
implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance)
2357
}
2458

2559
final case class Instance (
2660
id: Option[Long],
2761
host: String,
28-
portNumber: Int,
62+
portNumber: Long,
2963
name: String,
30-
componentType: InstanceEnums.ComponentType
64+
componentType: InstanceEnums.ComponentType,
65+
dockerId: Option[String],
66+
instanceState: InstanceEnums.State
3167
)
32-
3368
object InstanceEnums {
34-
3569
type ComponentType = ComponentType.Value
3670
object ComponentType extends Enumeration {
37-
val Crawler = Value("Crawler")
38-
val WebApi = Value("WebApi")
39-
val WebApp = Value("WebApp")
40-
val DelphiManagement = Value("DelphiManagement")
41-
val ElasticSearch = Value("ElasticSearch")
71+
val Crawler : Value = Value("Crawler")
72+
val ElasticSearch : Value = Value("ElasticSearch")
73+
val WebApi : Value = Value("WebApi")
74+
val WebApp : Value = Value("WebApp")
75+
val DelphiManagement : Value = Value("DelphiManagement")
76+
}
77+
type State = InstanceState.Value
78+
object InstanceState extends Enumeration {
79+
val Running : Value = Value("Running")
80+
val Paused : Value = Value("Paused")
81+
val Stopped : Value = Value("Stopped")
82+
val Failed : Value = Value("Failed")
83+
val NotReachable : Value = Value("NotReachable")
4284
}
43-
4485
}

src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala

Lines changed: 198 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
117
package de.upb.cs.swt.delphi.instancemanagement
218

319
import java.net.InetAddress
420

521
import akka.actor.ActorSystem
622
import akka.http.scaladsl.Http
7-
import akka.http.scaladsl.marshalling.Marshal
823
import akka.http.scaladsl.model._
924
import akka.http.scaladsl.unmarshalling.Unmarshal
1025
import akka.stream.ActorMaterializer
11-
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
26+
import akka.util.ByteString
27+
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState}
1228
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}
1329

1430
import scala.concurrent.{Await, ExecutionContext, Future}
15-
import scala.concurrent.duration.Duration
31+
import scala.concurrent.duration._
1632
import scala.util.{Failure, Success, Try}
33+
import spray.json._
1734

1835
object InstanceRegistry extends JsonSupport with AppLogging
1936
{
@@ -22,8 +39,85 @@ object InstanceRegistry extends JsonSupport with AppLogging
2239
implicit val ec : ExecutionContext = system.dispatcher
2340
implicit val materializer : ActorMaterializer = Server.materializer
2441

42+
lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption
43+
44+
45+
def handleInstanceStart(configuration: Configuration) : Option[Long] = {
46+
instanceIdFromEnv match {
47+
case Some(id) =>
48+
reportStart(configuration) match {
49+
case Success(_) => Some(id)
50+
case Failure(_) => None
51+
}
52+
case None =>
53+
register(configuration) match {
54+
case Success(id) => Some(id)
55+
case Failure(_) => None
56+
}
57+
}
58+
}
2559

26-
def register(configuration: Configuration) : Try[Long] = {
60+
def handleInstanceStop(configuration: Configuration) : Try[Unit] = {
61+
if(instanceIdFromEnv.isDefined) {
62+
reportStop(configuration)
63+
} else {
64+
deregister(configuration)
65+
}
66+
}
67+
68+
def handleInstanceFailure(configuration: Configuration) : Try[Unit] = {
69+
if(instanceIdFromEnv.isDefined) {
70+
reportFailure(configuration)
71+
} else {
72+
deregister(configuration)
73+
}
74+
}
75+
76+
def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start)
77+
78+
def reportStop(configuration: Configuration) : Try[Unit] = {
79+
if(configuration.usingInstanceRegistry) {
80+
executeReportOperation(configuration, ReportOperationType.Stop)
81+
} else {
82+
Failure(new RuntimeException("Cannot report stop, no instance registry available."))
83+
}
84+
}
85+
86+
def reportFailure(configuration: Configuration) : Try[Unit] = {
87+
if(configuration.usingInstanceRegistry){
88+
executeReportOperation(configuration, ReportOperationType.Failure)
89+
} else {
90+
Failure(new RuntimeException("Cannot report failure, no instance registry available."))
91+
}
92+
}
93+
94+
private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = {
95+
instanceIdFromEnv match {
96+
case Some(id) =>
97+
val request = HttpRequest(
98+
method = HttpMethods.POST,
99+
configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id))
100+
101+
Await.result(Http(system).singleRequest(request) map {response =>
102+
if(response.status == StatusCodes.OK){
103+
log.info(s"Successfully reported ${operationType.toString} to Instance Registry.")
104+
Success()
105+
}
106+
else {
107+
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")
108+
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}"))
109+
}
110+
111+
} recover {case ex =>
112+
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")
113+
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex"))
114+
}, Duration.Inf)
115+
case None =>
116+
log.warning(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")
117+
Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'."))
118+
}
119+
}
120+
def register(configuration: Configuration) :Try[Long] = {
27121
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)
28122

29123
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
@@ -56,21 +150,26 @@ object InstanceRegistry extends JsonSupport with AppLogging
56150
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")
57151

58152
Await.result(Http(system).singleRequest(request) map {response =>
59-
val status = response.status
60-
if(status == StatusCodes.OK) {
61-
62-
Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
63-
val elasticIP = instance.host
64-
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ")
65-
Success(instance)
66-
} recover {case ex =>
67-
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
68-
Failure(ex)
69-
}, Duration.Inf)
70-
}
71-
else{
72-
log.warning(s"Failed to read response from Instance Registry, server returned $status")
73-
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
153+
response.status match {
154+
case StatusCodes.OK =>
155+
try {
156+
val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds)
157+
val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat)
158+
val elasticIP = esInstance.host
159+
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP")
160+
Success(esInstance)
161+
} catch {
162+
case px: spray.json.JsonParser.ParsingException =>
163+
log.warning(s"Failed to read response from Instance Registry, exception: $px")
164+
Failure(px)
165+
}
166+
case StatusCodes.NotFound =>
167+
log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.")
168+
Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}"))
169+
case _ =>
170+
val status = response.status
171+
log.warning(s"Failed to read matching instance from Instance Registry, server returned $status")
172+
Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status"))
74173
}
75174
} recover { case ex =>
76175
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
@@ -139,13 +238,89 @@ object InstanceRegistry extends JsonSupport with AppLogging
139238
}
140239
}
141240

142-
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
143-
Marshal(instance).to[RequestEntity] flatMap { entity =>
144-
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
241+
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = {
242+
try {
243+
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString())
145244
Http(system).singleRequest(request)
245+
} catch {
246+
case dx: DeserializationException =>
247+
log.warning(s"Failed to deregister to Instance Registry, exception: $dx")
248+
Future.failed(dx)
146249
}
250+
}
147251

148252

149253
private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
150-
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi)
254+
Instance(id, InetAddress.getLocalHost.getHostAddress,
255+
controlPort, name, ComponentType.WebApi, None, InstanceState.Running)
256+
257+
def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={
258+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart")
259+
Await.result(Http(system).singleRequest(request) map {response =>
260+
if(response.status == StatusCodes.OK){
261+
Success(response.entity)
262+
}
263+
else {
264+
val statuscode = response.status
265+
log.warning(s"Failed to perform reportStart, server returned $statuscode")
266+
Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode"))
267+
}
268+
} recover {case ex =>
269+
log.warning(s"Failed to perform reportStart, exception: $ex")
270+
Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex"))
271+
}, Duration.Inf)
272+
}
273+
274+
def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = {
275+
276+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure")
277+
Await.result(Http(system).singleRequest(request) map {response =>
278+
if(response.status == StatusCodes.OK){
279+
Success(response.entity)
280+
}
281+
else {
282+
val statuscode = response.status
283+
log.warning(s"Failed to perform reportFailure, server returned $statuscode")
284+
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode"))
285+
}
286+
} recover {case ex =>
287+
log.warning(s"Failed to perform reportFailure, server returned, exception: $ex")
288+
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex"))
289+
}, Duration.Inf)
290+
}
291+
292+
def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = {
293+
294+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop")
295+
Await.result(Http(system).singleRequest(request) map {response =>
296+
if(response.status == StatusCodes.OK){
297+
Success(response.entity)
298+
}
299+
else {
300+
val statuscode = response.status
301+
log.warning(s"Failed to perform reportStop, server returned $statuscode")
302+
Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode"))
303+
}
304+
} recover {case ex =>
305+
log.warning(s"Failed to perform reportStop, server returned, exception: $ex")
306+
Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex"))
307+
}, Duration.Inf)
308+
}
309+
310+
object ReportOperationType extends Enumeration {
311+
val Start : Value = Value("Start")
312+
val Stop : Value = Value("Stop")
313+
val Failure : Value = Value("Failure")
314+
315+
def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = {
316+
operation match {
317+
case Start =>
318+
s"/reportStart?Id=$id"
319+
case Stop =>
320+
s"/reportStop?Id=$id"
321+
case _ =>
322+
s"/reportFailure?Id=$id"
323+
}
324+
}
325+
}
151326
}

0 commit comments

Comments
 (0)