Skip to content

Commit d47316d

Browse files
committed
Implementation of functions with respect to the new API. Other funtions are working fine, unable to test reportstop() and resportstart() from inside the container by exporting enviournment variable -INSTANCE_ID- refs #23
1 parent f56a54d commit d47316d

File tree

4 files changed

+280
-46
lines changed

4 files changed

+280
-46
lines changed

delphi-webapi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit ed901bf01811ac64126906ab10340da0f87fe0e8
Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,85 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
116
package de.upb.cs.swt.delphi.instancemanagement
2-
317
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
4-
import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat}
18+
import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat}
519

620
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
7-
implicit val componentTypeFormat = new JsonFormat[InstanceEnums.ComponentType] {
21+
22+
implicit val componentTypeFormat : JsonFormat[InstanceEnums.ComponentType] = new JsonFormat[InstanceEnums.ComponentType] {
23+
824
def write(compType : InstanceEnums.ComponentType) = JsString(compType.toString)
925

10-
def read(value: JsValue) = value match {
26+
def read(value: JsValue) : InstanceEnums.ComponentType = value match {
1127
case JsString(s) => s match {
1228
case "Crawler" => InstanceEnums.ComponentType.Crawler
1329
case "WebApi" => InstanceEnums.ComponentType.WebApi
1430
case "WebApp" => InstanceEnums.ComponentType.WebApp
1531
case "DelphiManagement" => InstanceEnums.ComponentType.DelphiManagement
1632
case "ElasticSearch" => InstanceEnums.ComponentType.ElasticSearch
17-
case x => throw new RuntimeException(s"Unexpected string value $x for component type.")
33+
case x => throw DeserializationException(s"Unexpected string value $x for component type.")
1834
}
19-
case y => throw new RuntimeException(s"Unexpected type $y while deserializing component type.")
35+
case y => throw DeserializationException(s"Unexpected type $y while deserializing component type.")
2036
}
2137
}
22-
implicit val instanceFormat = jsonFormat5(Instance)
38+
39+
implicit val stateFormat : JsonFormat[InstanceEnums.State] = new JsonFormat[InstanceEnums.State] {
40+
41+
def write(compType : InstanceEnums.State) = JsString(compType.toString)
42+
43+
def read(value: JsValue) : InstanceEnums.State = value match {
44+
case JsString(s) => s match {
45+
case "Running" => InstanceEnums.InstanceState.Running
46+
case "Stopped" => InstanceEnums.InstanceState.Stopped
47+
case "Failed" => InstanceEnums.InstanceState.Failed
48+
case "Paused" => InstanceEnums.InstanceState.Paused
49+
case "NotReachable" => InstanceEnums.InstanceState.NotReachable
50+
case x => throw DeserializationException(s"Unexpected string value $x for instance state.")
51+
}
52+
case y => throw DeserializationException(s"Unexpected type $y while deserializing instance state.")
53+
}
54+
}
55+
56+
implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance)
2357
}
2458

2559
final case class Instance (
2660
id: Option[Long],
2761
host: String,
28-
portNumber: Int,
62+
portNumber: Long,
2963
name: String,
30-
/* Component Type */
31-
componentType: InstanceEnums.ComponentType
64+
componentType: InstanceEnums.ComponentType,
65+
dockerId: Option[String],
66+
instanceState: InstanceEnums.State
3267
)
33-
3468
object InstanceEnums {
35-
3669
type ComponentType = ComponentType.Value
3770
object ComponentType extends Enumeration {
38-
val Crawler = Value("Crawler")
39-
val WebApi = Value("WebApi")
40-
val WebApp = Value("WebApp")
41-
val DelphiManagement = Value("DelphiManagement")
42-
val ElasticSearch = Value("ElasticSearch")
71+
val Crawler : Value = Value("Crawler")
72+
val ElasticSearch : Value = Value("ElasticSearch")
73+
val WebApi : Value = Value("WebApi")
74+
val WebApp : Value = Value("WebApp")
75+
val DelphiManagement : Value = Value("DelphiManagement")
76+
}
77+
type State = InstanceState.Value
78+
object InstanceState extends Enumeration {
79+
val Running : Value = Value("Running")
80+
val Paused : Value = Value("Paused")
81+
val Stopped : Value = Value("Stopped")
82+
val Failed : Value = Value("Failed")
83+
val NotReachable : Value = Value("NotReachable")
4384
}
44-
4585
}

src/main/scala/de/upb/cs/swt/delphi/instancemanagement/InstanceRegistry.scala

Lines changed: 199 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
117
package de.upb.cs.swt.delphi.instancemanagement
218

319
import java.net.InetAddress
420

521
import akka.actor.ActorSystem
622
import akka.http.scaladsl.Http
7-
import akka.http.scaladsl.marshalling.Marshal
823
import akka.http.scaladsl.model._
924
import akka.http.scaladsl.unmarshalling.Unmarshal
1025
import akka.stream.ActorMaterializer
11-
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.ComponentType
26+
import akka.util.ByteString
27+
import de.upb.cs.swt.delphi.instancemanagement.InstanceEnums.{ComponentType, InstanceState}
1228
import de.upb.cs.swt.delphi.webapi.{AppLogging, Configuration, Server}
1329

1430
import scala.concurrent.{Await, ExecutionContext, Future}
15-
import scala.concurrent.duration.Duration
31+
import scala.concurrent.duration._
1632
import scala.util.{Failure, Success, Try}
33+
import spray.json._
1734

1835
object InstanceRegistry extends JsonSupport with AppLogging
1936
{
@@ -22,8 +39,85 @@ object InstanceRegistry extends JsonSupport with AppLogging
2239
implicit val ec : ExecutionContext = system.dispatcher
2340
implicit val materializer : ActorMaterializer = Server.materializer
2441

42+
lazy val instanceIdFromEnv : Option[Long] = Try[Long](sys.env("INSTANCE_ID").toLong).toOption
43+
44+
45+
def handleInstanceStart(configuration: Configuration) : Option[Long] = {
46+
instanceIdFromEnv match {
47+
case Some(id) =>
48+
reportStart(configuration) match {
49+
case Success(_) => Some(id)
50+
case Failure(_) => None
51+
}
52+
case None =>
53+
register(configuration) match {
54+
case Success(id) => Some(id)
55+
case Failure(_) => None
56+
}
57+
}
58+
}
2559

26-
def register(configuration: Configuration) : Try[Long] = {
60+
def handleInstanceStop(configuration: Configuration) : Try[Unit] = {
61+
if(instanceIdFromEnv.isDefined) {
62+
reportStop(configuration)
63+
} else {
64+
deregister(configuration)
65+
}
66+
}
67+
68+
def handleInstanceFailure(configuration: Configuration) : Try[Unit] = {
69+
if(instanceIdFromEnv.isDefined) {
70+
reportFailure(configuration)
71+
} else {
72+
deregister(configuration)
73+
}
74+
}
75+
76+
def reportStart(configuration: Configuration) : Try[Unit] = executeReportOperation(configuration, ReportOperationType.Start)
77+
78+
def reportStop(configuration: Configuration) : Try[Unit] = {
79+
if(configuration.usingInstanceRegistry) {
80+
executeReportOperation(configuration, ReportOperationType.Stop)
81+
} else {
82+
Failure(new RuntimeException("Cannot report stop, no instance registry available."))
83+
}
84+
}
85+
86+
def reportFailure(configuration: Configuration) : Try[Unit] = {
87+
if(configuration.usingInstanceRegistry){
88+
executeReportOperation(configuration, ReportOperationType.Failure)
89+
} else {
90+
Failure(new RuntimeException("Cannot report failure, no instance registry available."))
91+
}
92+
}
93+
94+
private def executeReportOperation(configuration: Configuration, operationType: ReportOperationType.Value) : Try[Unit] = {
95+
instanceIdFromEnv match {
96+
case Some(id) =>
97+
val request = HttpRequest(
98+
method = HttpMethods.POST,
99+
configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id))
100+
101+
Await.result(Http(system).singleRequest(request) map {response =>
102+
if(response.status == StatusCodes.OK){
103+
log.info(s"Successfully reported ${operationType.toString} to Instance Registry.")
104+
Success()
105+
}
106+
else {
107+
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}")
108+
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, server returned ${response.status}"))
109+
}
110+
111+
} recover {case ex =>
112+
log.warning(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex")
113+
Failure(new RuntimeException(s"Failed to report ${operationType.toString} to Instance Registry, exception: $ex"))
114+
}, Duration.Inf)
115+
case None =>
116+
log.warning(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'.")
117+
Failure(new RuntimeException(s"Cannot report ${operationType.toString} to Instance Registry, no instance id is present in env var 'INSTANCE_ID'."))
118+
}
119+
}
120+
def register(configuration: Configuration) :Try[Long] = {
27121
val instance = createInstance(None,configuration.bindPort, configuration.instanceName)
28122

29123
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
@@ -56,21 +150,26 @@ object InstanceRegistry extends JsonSupport with AppLogging
56150
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/matchingInstance?ComponentType=ElasticSearch")
57151

58152
Await.result(Http(system).singleRequest(request) map {response =>
59-
val status = response.status
60-
if(status == StatusCodes.OK) {
61-
62-
Await.result(Unmarshal(response.entity).to[Instance] map {instance =>
63-
val elasticIP = instance.host
64-
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP ")
65-
Success(instance)
66-
} recover {case ex =>
67-
log.warning(s"Failed to read response from Instance Registry, exception: $ex")
68-
Failure(ex)
69-
}, Duration.Inf)
70-
}
71-
else{
72-
log.warning(s"Failed to read response from Instance Registry, server returned $status")
73-
Failure(new RuntimeException(s"Failed to read response from Instance Registry, server returned $status"))
153+
response.status match {
154+
case StatusCodes.OK =>
155+
try {
156+
val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds)
157+
val esInstance = instanceString.parseJson.convertTo[Instance](instanceFormat)
158+
val elasticIP = esInstance.host
159+
log.info(s"Instance Registry assigned ElasticSearch instance at $elasticIP")
160+
Success(esInstance)
161+
} catch {
162+
case px: spray.json.JsonParser.ParsingException =>
163+
log.warning(s"Failed to read response from Instance Registry, exception: $px")
164+
Failure(px)
165+
}
166+
case StatusCodes.NotFound =>
167+
log.warning(s"No matching instance of type 'ElasticSearch' is present at the instance registry.")
168+
Failure(new RuntimeException(s"Instance Registry did not contain matching instance, server returned ${StatusCodes.NotFound}"))
169+
case _ =>
170+
val status = response.status
171+
log.warning(s"Failed to read matching instance from Instance Registry, server returned $status")
172+
Failure(new RuntimeException(s"Failed to read matching instance from Instance Registry, server returned $status"))
74173
}
75174
} recover { case ex =>
76175
log.warning(s"Failed to request ElasticSearch instance from Instance Registry, exception: $ex ")
@@ -139,13 +238,89 @@ object InstanceRegistry extends JsonSupport with AppLogging
139238
}
140239
}
141240

142-
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] =
143-
Marshal(instance).to[RequestEntity] flatMap { entity =>
144-
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity)
241+
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = {
242+
try {
243+
val request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString())
145244
Http(system).singleRequest(request)
245+
} catch {
246+
case dx: DeserializationException =>
247+
log.warning(s"Failed to deregister to Instance Registry, exception: $dx")
248+
Future.failed(dx)
146249
}
250+
}
147251

148252

149253
private def createInstance(id: Option[Long], controlPort : Int, name : String) : Instance =
150-
Instance(id, InetAddress.getLocalHost.getHostAddress, controlPort, name, ComponentType.WebApi)
151-
}
254+
Instance(id, InetAddress.getLocalHost.getHostAddress,
255+
controlPort, name, ComponentType.WebApi, None, InstanceState.Running)
256+
257+
def reportStart(id: String, configuration: Configuration):Try[ResponseEntity] ={
258+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStart")
259+
Await.result(Http(system).singleRequest(request) map {response =>
260+
if(response.status == StatusCodes.OK){
261+
Success(response.entity)
262+
}
263+
else {
264+
val statuscode = response.status
265+
log.warning(s"Failed to perform reportStart, server returned $statuscode")
266+
Failure(new RuntimeException(s"Failed to perform reportStart, server returned $statuscode"))
267+
}
268+
} recover {case ex =>
269+
log.warning(s"Failed to perform reportStart, exception: $ex")
270+
Failure(new RuntimeException(s"Failed to perform reportStart, server returned, exception: $ex"))
271+
}, Duration.Inf)
272+
}
273+
274+
def reportFailure(id: String, configuration: Configuration):Try[ResponseEntity] = {
275+
276+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportFailure")
277+
Await.result(Http(system).singleRequest(request) map {response =>
278+
if(response.status == StatusCodes.OK){
279+
Success(response.entity)
280+
}
281+
else {
282+
val statuscode = response.status
283+
log.warning(s"Failed to perform reportFailure, server returned $statuscode")
284+
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned $statuscode"))
285+
}
286+
} recover {case ex =>
287+
log.warning(s"Failed to perform reportFailure, server returned, exception: $ex")
288+
Failure(new RuntimeException(s"Failed to perform reportFailure, server returned, exception: $ex"))
289+
}, Duration.Inf)
290+
}
291+
292+
def reportStop(id: String, configuration: Configuration):Try[ResponseEntity] = {
293+
294+
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri + "/reportStop")
295+
Await.result(Http(system).singleRequest(request) map {response =>
296+
if(response.status == StatusCodes.OK){
297+
Success(response.entity)
298+
}
299+
else {
300+
val statuscode = response.status
301+
log.warning(s"Failed to perform reportStop, server returned $statuscode")
302+
Failure(new RuntimeException(s"Failed to perform reportStop, server returned $statuscode"))
303+
}
304+
} recover {case ex =>
305+
log.warning(s"Failed to perform reportStop, server returned, exception: $ex")
306+
Failure(new RuntimeException(s"Failed to perform reportStop, server returned, exception: $ex"))
307+
}, Duration.Inf)
308+
}
309+
310+
object ReportOperationType extends Enumeration {
311+
val Start : Value = Value("Start")
312+
val Stop : Value = Value("Stop")
313+
val Failure : Value = Value("Failure")
314+
315+
def toOperationUriString(operation: ReportOperationType.Value, id: Long) : String = {
316+
operation match {
317+
case Start =>
318+
s"/reportStart?Id=$id"
319+
case Stop =>
320+
s"/reportStop?Id=$id"
321+
case _ =>
322+
s"/reportFailure?Id=$id"
323+
}
324+
}
325+
}
326+
}

0 commit comments

Comments
 (0)