Skip to content

Commit e9b8daf

Browse files
authored
Merge pull request #35 from delphi-hub/feature/authentication
Introduced JWT based authorization for registry communication
2 parents d9bdabb + 9331aee commit e9b8daf

File tree

4 files changed

+56
-7
lines changed

4 files changed

+56
-7
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ libraryDependencies ++= Seq(
5555

5656
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.5.3"
5757

58+
libraryDependencies += "com.pauldijou" %% "jwt-core" % "1.0.0"
59+
5860
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime
5961

6062
val elastic4sVersion = "6.3.0"

src/main/scala/de/upb/cs/swt/delphi/crawler/Configuration.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ class Configuration {
9696

9797
lazy val instanceId : Option[Long] = InstanceRegistry.handleInstanceStart(this)
9898

99+
val jwtSecretKey: String = sys.env.getOrElse("DELPHI_JWT_SECRET","changeme")
100+
99101
case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode)
100102
}
101103

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (C) 2018 The Delphi Team.
2+
// See the LICENCE file distributed with this work for additional
3+
// information regarding copyright ownership.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
package de.upb.cs.swt.delphi.crawler.authorization
17+
18+
import de.upb.cs.swt.delphi.crawler.Configuration
19+
import pdi.jwt.{Jwt, JwtAlgorithm, JwtClaim}
20+
21+
object AuthProvider {
22+
23+
def generateJwt(validFor: Long = 1, useGenericName: Boolean = false) (implicit configuration: Configuration): String = {
24+
val claim = JwtClaim()
25+
.issuedNow
26+
.expiresIn(validFor * 60)
27+
.startsNow
28+
. + ("user_id", if (useGenericName) configuration.instanceName else s"${configuration.instanceId.get}")
29+
. + ("user_type", "Component")
30+
31+
32+
Jwt.encode(claim, configuration.jwtSecretKey, JwtAlgorithm.HS256)
33+
}
34+
35+
}

src/main/scala/de/upb/cs/swt/delphi/crawler/instancemanagement/InstanceRegistry.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import java.net.InetAddress
2121
import akka.actor.ActorSystem
2222
import akka.http.scaladsl.Http
2323
import akka.http.scaladsl.model._
24+
import akka.http.scaladsl.model.headers.RawHeader
2425
import akka.http.scaladsl.unmarshalling.Unmarshal
2526
import akka.stream.ActorMaterializer
2627
import akka.util.ByteString
28+
import de.upb.cs.swt.delphi.crawler.authorization.AuthProvider
2729
import de.upb.cs.swt.delphi.crawler.instancemanagement.InstanceEnums.{ComponentType, InstanceState}
2830
import de.upb.cs.swt.delphi.crawler.{AppLogging, Configuration, Crawler}
2931

@@ -99,7 +101,10 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
99101
method = HttpMethods.POST,
100102
configuration.instanceRegistryUri + ReportOperationType.toOperationUriString(operationType, id))
101103

102-
Await.result(Http(system).singleRequest(request) map {response =>
104+
val useGenericNameForToken = operationType == ReportOperationType.Start //Must use generic name for startup, no id known at that point
105+
106+
Await.result(Http(system).singleRequest(request.withHeaders(RawHeader("Authorization",
107+
s"Bearer ${AuthProvider.generateJwt(useGenericName = useGenericNameForToken)(configuration)}"))) map { response =>
103108
if(response.status == StatusCodes.OK){
104109
log.info(s"Successfully reported ${operationType.toString} to Instance Registry.")
105110
Success()
@@ -122,7 +127,7 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
122127
private def register(configuration: Configuration) : Try[Long] = {
123128
val instance = createInstance(None,configuration.controlServerPort, configuration.instanceName)
124129

125-
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register") map {response =>
130+
Await.result(postInstance(instance, configuration.instanceRegistryUri + "/register")(configuration) map {response =>
126131
if(response.status == StatusCodes.OK){
127132
Await.result(Unmarshal(response.entity).to[String] map { assignedID =>
128133
val id = assignedID.toLong
@@ -152,7 +157,8 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
152157
val request = HttpRequest(method = HttpMethods.GET, configuration.instanceRegistryUri +
153158
s"/matchingInstance?Id=${configuration.instanceId.getOrElse(-1)}&ComponentType=ElasticSearch")
154159

155-
Await.result(Http(system).singleRequest(request) map {response =>
160+
Await.result(Http(system).singleRequest(request
161+
.withHeaders(RawHeader("Authorization", s"Bearer ${AuthProvider.generateJwt()(configuration)}"))) map {response =>
156162
response.status match {
157163
case StatusCodes.OK =>
158164
val instanceString : String = Await.result(response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String), 5 seconds)
@@ -192,7 +198,8 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
192198
configuration.instanceRegistryUri +
193199
s"/matchingResult?CallerId=${configuration.instanceId.getOrElse(-1)}&MatchedInstanceId=$idToPost&MatchingSuccessful=$isElasticSearchReachable")
194200

195-
Await.result(Http(system).singleRequest(request) map {response =>
201+
Await.result(Http(system).singleRequest(request
202+
.withHeaders(RawHeader("Authorization", s"Bearer ${AuthProvider.generateJwt()(configuration)}"))) map {response =>
196203
if(response.status == StatusCodes.OK){
197204
log.info("Successfully posted matching result to Instance Registry.")
198205
Success()
@@ -220,7 +227,8 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
220227

221228
val request = HttpRequest(method = HttpMethods.POST, configuration.instanceRegistryUri + s"/deregister?Id=$id")
222229

223-
Await.result(Http(system).singleRequest(request) map {response =>
230+
Await.result(Http(system).singleRequest(request
231+
.withHeaders(RawHeader("Authorization", s"Bearer ${AuthProvider.generateJwt()(configuration)}"))) map {response =>
224232
if(response.status == StatusCodes.OK){
225233
log.info("Successfully deregistered from Instance Registry.")
226234
Success()
@@ -238,9 +246,11 @@ object InstanceRegistry extends InstanceJsonSupport with AppLogging
238246
}
239247
}
240248

241-
def postInstance(instance : Instance, uri: String) () : Future[HttpResponse] = {
249+
def postInstance(instance : Instance, uri: String) (implicit configuration: Configuration) : Future[HttpResponse] = {
242250
Try(HttpRequest(method = HttpMethods.POST, uri = uri, entity = instance.toJson(instanceFormat).toString())) match {
243-
case Success(request) => Http(system).singleRequest(request)
251+
//use generic name for startup, no id present at this point
252+
case Success(request) => Http(system).singleRequest(request
253+
.withHeaders(RawHeader("Authorization", s"Bearer ${AuthProvider.generateJwt(useGenericName = true)(configuration)}")))
244254
case Failure(dx) =>
245255
log.warning(s"Failed to deregister to Instance Registry, exception: $dx")
246256
Future.failed(dx)

0 commit comments

Comments
 (0)