From 8d2997ea79392050d639fc56866aa8e9dceb9843 Mon Sep 17 00:00:00 2001 From: almacken Date: Mon, 28 May 2018 16:14:13 -0600 Subject: [PATCH 1/7] Added feature endpoint. Note that it requires features to be mapped to feature descriptions to return them, and currently no features are mapped so no features are returned. --- build.sbt | 3 + .../FeatureDescription.scala | 4 + .../FeatureListMapping.scala | 404 ++++++++++++++++++ .../cs/swt/delphi/webapi/JsonSupport.scala | 6 + .../de/upb/cs/swt/delphi/webapi/Server.scala | 6 +- 5 files changed, 421 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureDescription.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureListMapping.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/JsonSupport.scala diff --git a/build.sbt b/build.sbt index e418ad3..9293e5f 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,9 @@ scalaVersion := "2.12.4" libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4" libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.11" +libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12" +libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.1" +libraryDependencies += "io.spray" %% "spray-json" % "1.3.3" val elastic4sVersion = "6.2.8" libraryDependencies ++= Seq( diff --git a/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureDescription.scala b/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureDescription.scala new file mode 100644 index 0000000..944d03a --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureDescription.scala @@ -0,0 +1,4 @@ +package de.upb.cs.swt.delphi.featuredefinitions + +//Describes all relevant fields for conducting searches on this class +case class FeatureDescription(fType: String) diff --git a/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureListMapping.scala b/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureListMapping.scala new file mode 100644 index 0000000..d4c165f --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/featuredefinitions/FeatureListMapping.scala @@ -0,0 +1,404 @@ +package de.upb.cs.swt.delphi.featuredefinitions + +object FeatureListMapping { + + //Returns a list of all defined features + def featureList: List[String] = + (for {(key, des) <- featureMap if des != null} yield key).toList + + //Maps all field names onto descriptions of that field. Null fields cannot be searched in the current version + private val featureMap = Map[String, FeatureDescription]( + "ProjectPackages" -> null, + "⟨SizeOfInheritanceTree⟩" -> null, + "ProjectFields" -> null, + "LibraryMethods" -> null, + "ProjectInstructions" -> null, + "ProjectMethods" -> null, + "LibraryClassFiles" -> null, + "LibraryFields" -> null, + "LibraryPackages" -> null, + "ProjectClassFiles" -> null, + "0 FPC" -> null, + "1-3 FPC" -> null, + "4-10 FPC" -> null, + ">10 FPC" -> null, + "0 MPC" -> null, + "1-3 MPC" -> null, + "4-10 MPC" -> null, + ">10 MPC" -> null, + "1-3 CPP" -> null, + "4-10 CPP" -> null, + ">10 CPP" -> null, + "0 NOC" -> null, + "1-3 NOC" -> null, + "4-10 NOC" -> null, + ">10 NOC" -> null, + "linear methods (McCabe)" -> null, + "2-3 McCabe" -> null, + "4-10 McCabe" -> null, + ">10 McCabe" -> null, + "Designator" -> null, + "Taxonomy" -> null, + "Joiner" -> null, + "Pool" -> null, + "Function Pointer" -> null, + "Function Object" -> null, + "Cobol Like" -> null, + "Stateless" -> null, + "Common State" -> null, + "Immutable" -> null, + "Restricted Creation" -> null, + "Sampler" -> null, + "Box" -> null, + "Compound Box" -> null, + "Canopy" -> null, + "Record" -> null, + "Data Manager" -> null, + "Sink" -> null, + "Outline" -> null, + "Trait" -> null, + "State Machine" -> null, + "Pure Type" -> null, + "Augmented Type" -> null, + "Pseudo Class" -> null, + "Implementor" -> null, + "Overrider" -> null, + "Extender" -> null, + "unused private fields" -> null, + "unused package visible fields" -> null, + "unused protected fields" -> null, + "unused public fields" -> null, + "package visible fields only used by defining type" -> null, + "protected fields only used by defining type" -> null, + "public fields only used by defininig type " -> null, + "Trivial Class.forName Usage" -> null, + "Nontrivial Class.forName Usage" -> null, + "nop (opcode:0)" -> null, + "aconst_null (opcode:1)" -> null, + "iconst_m1 (opcode:2)" -> null, + "iconst_0 (opcode:3)" -> null, + "iconst_1 (opcode:4)" -> null, + "iconst_2 (opcode:5)" -> null, + "iconst_3 (opcode:6)" -> null, + "iconst_4 (opcode:7)" -> null, + "iconst_5 (opcode:8)" -> null, + "lconst_0 (opcode:9)" -> null, + "lconst_1 (opcode:10)" -> null, + "fconst_0 (opcode:11)" -> null, + "fconst_1 (opcode:12)" -> null, + "fconst_2 (opcode:13)" -> null, + "dconst_0 (opcode:14)" -> null, + "dconst_1 (opcode:15)" -> null, + "bipush (opcode:16)" -> null, + "sipush (opcode:17)" -> null, + "ldc (opcode:18)" -> null, + "ldc_w (opcode:19)" -> null, + "ldc2_w (opcode:20)" -> null, + "iload (opcode:21)" -> null, + "lload (opcode:22)" -> null, + "fload (opcode:23)" -> null, + "dload (opcode:24)" -> null, + "aload (opcode:25)" -> null, + "iload_0 (opcode:26)" -> null, + "iload_1 (opcode:27)" -> null, + "iload_2 (opcode:28)" -> null, + "iload_3 (opcode:29)" -> null, + "lload_0 (opcode:30)" -> null, + "lload_1 (opcode:31)" -> null, + "lload_2 (opcode:32)" -> null, + "lload_3 (opcode:33)" -> null, + "fload_0 (opcode:34)" -> null, + "fload_1 (opcode:35)" -> null, + "fload_2 (opcode:36)" -> null, + "fload_3 (opcode:37)" -> null, + "dload_0 (opcode:38)" -> null, + "dload_1 (opcode:39)" -> null, + "dload_2 (opcode:40)" -> null, + "dload_3 (opcode:41)" -> null, + "aload_0 (opcode:42)" -> null, + "aload_1 (opcode:43)" -> null, + "aload_2 (opcode:44)" -> null, + "aload_3 (opcode:45)" -> null, + "iaload (opcode:46)" -> null, + "laload (opcode:47)" -> null, + "faload (opcode:48)" -> null, + "daload (opcode:49)" -> null, + "aaload (opcode:50)" -> null, + "baload (opcode:51)" -> null, + "caload (opcode:52)" -> null, + "saload (opcode:53)" -> null, + "istore (opcode:54)" -> null, + "lstore (opcode:55)" -> null, + "fstore (opcode:56)" -> null, + "dstore (opcode:57)" -> null, + "astore (opcode:58)" -> null, + "istore_0 (opcode:59)" -> null, + "istore_1 (opcode:60)" -> null, + "istore_2 (opcode:61)" -> null, + "istore_3 (opcode:62)" -> null, + "lstore_0 (opcode:63)" -> null, + "lstore_1 (opcode:64)" -> null, + "lstore_2 (opcode:65)" -> null, + "lstore_3 (opcode:66)" -> null, + "fstore_0 (opcode:67)" -> null, + "fstore_1 (opcode:68)" -> null, + "fstore_2 (opcode:69)" -> null, + "fstore_3 (opcode:70)" -> null, + "dstore_0 (opcode:71)" -> null, + "dstore_1 (opcode:72)" -> null, + "dstore_2 (opcode:73)" -> null, + "dstore_3 (opcode:74)" -> null, + "astore_0 (opcode:75)" -> null, + "astore_1 (opcode:76)" -> null, + "astore_2 (opcode:77)" -> null, + "astore_3 (opcode:78)" -> null, + "iastore (opcode:79)" -> null, + "lastore (opcode:80)" -> null, + "fastore (opcode:81)" -> null, + "dastore (opcode:82)" -> null, + "aastore (opcode:83)" -> null, + "bastore (opcode:84)" -> null, + "castore (opcode:85)" -> null, + "sastore (opcode:86)" -> null, + "pop (opcode:87)" -> null, + "pop2 (opcode:88)" -> null, + "dup (opcode:89)" -> null, + "dup_x1 (opcode:90)" -> null, + "dup_x2 (opcode:91)" -> null, + "dup2 (opcode:92)" -> null, + "dup2_x1 (opcode:93)" -> null, + "dup2_x2 (opcode:94)" -> null, + "swap (opcode:95)" -> null, + "iadd (opcode:96)" -> null, + "ladd (opcode:97)" -> null, + "fadd (opcode:98)" -> null, + "dadd (opcode:99)" -> null, + "isub (opcode:100)" -> null, + "lsub (opcode:101)" -> null, + "fsub (opcode:102)" -> null, + "dsub (opcode:103)" -> null, + "imul (opcode:104)" -> null, + "lmul (opcode:105)" -> null, + "fmul (opcode:106)" -> null, + "dmul (opcode:107)" -> null, + "idiv (opcode:108)" -> null, + "ldiv (opcode:109)" -> null, + "fdiv (opcode:110)" -> null, + "ddiv (opcode:111)" -> null, + "irem (opcode:112)" -> null, + "lrem (opcode:113)" -> null, + "frem (opcode:114)" -> null, + "drem (opcode:115)" -> null, + "ineg (opcode:116)" -> null, + "lneg (opcode:117)" -> null, + "fneg (opcode:118)" -> null, + "dneg (opcode:119)" -> null, + "ishl (opcode:120)" -> null, + "lshl (opcode:121)" -> null, + "ishr (opcode:122)" -> null, + "lshr (opcode:123)" -> null, + "iushr (opcode:124)" -> null, + "lushr (opcode:125)" -> null, + "iand (opcode:126)" -> null, + "land (opcode:127)" -> null, + "ior (opcode:128)" -> null, + "lor (opcode:129)" -> null, + "ixor (opcode:130)" -> null, + "lxor (opcode:131)" -> null, + "iinc (opcode:132)" -> null, + "i2l (opcode:133)" -> null, + "i2f (opcode:134)" -> null, + "i2d (opcode:135)" -> null, + "l2i (opcode:136)" -> null, + "l2f (opcode:137)" -> null, + "l2d (opcode:138)" -> null, + "f2i (opcode:139)" -> null, + "f2l (opcode:140)" -> null, + "f2d (opcode:141)" -> null, + "d2i (opcode:142)" -> null, + "d2l (opcode:143)" -> null, + "d2f (opcode:144)" -> null, + "i2b (opcode:145)" -> null, + "i2c (opcode:146)" -> null, + "i2s (opcode:147)" -> null, + "lcmp (opcode:148)" -> null, + "fcmpl (opcode:149)" -> null, + "fcmpg (opcode:150)" -> null, + "dcmpl (opcode:151)" -> null, + "dcmpg (opcode:152)" -> null, + "ifeq (opcode:153)" -> null, + "ifne (opcode:154)" -> null, + "iflt (opcode:155)" -> null, + "ifge (opcode:156)" -> null, + "ifgt (opcode:157)" -> null, + "ifle (opcode:158)" -> null, + "if_icmpeq (opcode:159)" -> null, + "if_icmpne (opcode:160)" -> null, + "if_icmplt (opcode:161)" -> null, + "if_icmpge (opcode:162)" -> null, + "if_icmpgt (opcode:163)" -> null, + "if_icmple (opcode:164)" -> null, + "if_acmpeq (opcode:165)" -> null, + "if_acmpne (opcode:166)" -> null, + "goto (opcode:167)" -> null, + "jsr (opcode:168)" -> null, + "ret (opcode:169)" -> null, + "tableswitch (opcode:170)" -> null, + "lookupswitch (opcode:171)" -> null, + "ireturn (opcode:172)" -> null, + "lreturn (opcode:173)" -> null, + "freturn (opcode:174)" -> null, + "dreturn (opcode:175)" -> null, + "areturn (opcode:176)" -> null, + "return (opcode:177)" -> null, + "getstatic (opcode:178)" -> null, + "putstatic (opcode:179)" -> null, + "getfield (opcode:180)" -> null, + "putfield (opcode:181)" -> null, + "invokevirtual (opcode:182)" -> null, + "invokespecial (opcode:183)" -> null, + "invokestatic (opcode:184)" -> null, + "invokeinterface (opcode:185)" -> null, + "invokedynamic (opcode:186)" -> null, + "new (opcode:187)" -> null, + "newarray (opcode:188)" -> null, + "anewarray (opcode:189)" -> null, + "arraylength (opcode:190)" -> null, + "athrow (opcode:191)" -> null, + "checkcast (opcode:192)" -> null, + "instanceof (opcode:193)" -> null, + "monitorenter (opcode:194)" -> null, + "monitorexit (opcode:195)" -> null, + "wide (opcode:196)" -> null, + "multianewarray (opcode:197)" -> null, + "ifnull (opcode:198)" -> null, + "ifnonnull (opcode:199)" -> null, + "goto_w (opcode:200)" -> null, + "jsr_w (opcode:201)" -> null, + "Self-recursive Data Structure" -> null, + "Mutually-recursive Data Structure 2 Types" -> null, + "Mutually-recursive Data Structure 3 Types" -> null, + "Mutually-recursive Data Structure 4 Types" -> null, + "Mutually-recursive Data Structure more than 4 Types" -> null, + "Never Returns Normally" -> null, + "Method with Infinite Loop" -> null, + "Class File With Source Attribute" -> null, + "Method With Line Number Table" -> null, + "Method With Local Variable Table" -> null, + "Method With Local Variable Type Table" -> null, + "FanOut - Category 1" -> null, + "FanOut - Category 2" -> null, + "FanOut - Category 3" -> null, + "FanOut - Category 4" -> null, + "FanOut - Category 5" -> null, + "FanOut - Category 6" -> null, + "FanIn - Category 1" -> null, + "FanIn - Category 2" -> null, + "FanIn - Category 3" -> null, + "FanIn - Category 4" -> null, + "FanIn - Category 5" -> null, + "FanIn - Category 6" -> null, + "FanIn/FanOut - Category 1" -> null, + "FanIn/FanOut - Category 2" -> null, + "FanIn/FanOut - Category 3" -> null, + "FanIn/FanOut - Category 4" -> null, + "FanIn/FanOut - Category 5" -> null, + "FanIn/FanOut - Category 6" -> null, + "custom ClassLoader implementation" -> null, + "Retrieving the SystemClassLoader" -> null, + "Retrieving some ClassLoader" -> null, + "define new classes/packages" -> null, + "accessing resources" -> null, + "javax.crypto.Cipher getInstance" -> null, + "using SecureRandom" -> null, + "using MessageDigest" -> null, + "using Signature" -> null, + "using Mac" -> null, + "cryptographic key handling" -> null, + "using KeyStore" -> null, + "using Certificates" -> null, + "native methods" -> null, + "synthetic methods" -> null, + "bridge methods" -> null, + "synchronized methods" -> null, + "varargs methods" -> null, + "static initializers" -> null, + "static methods (not including static initializers)" -> null, + "constructors" -> null, + "instance methods" -> null, + "java.lang.Class forName" -> null, + "reflective instance creation" -> null, + "reflective field write" -> null, + "reflective field read" -> null, + "makes fields accessible" -> null, + "makes methods or constructors accessible" -> null, + "makes an AccessibleObject accessible (exact type unknown)" -> null, + "java.lang.reflect.Method Object invoke(Object, Object[])" -> null, + "java.lang.invoke.MethodHandles lookup" -> null, + "java.lang.invoke.MethodHandles publicLookup" -> null, + "method handle invocation" -> null, + "java.lang.reflect.Proxy newProxyInstance" -> null, + "Process" -> null, + "JVM exit" -> null, + "Native Libraries" -> null, + "java.lang.System getSecurityManager" -> null, + "java.lang.System setSecurityManager" -> null, + "Environment" -> null, + "Sound" -> null, + "Network sockets" -> null, + "Object-based Thread Notification" -> null, + "Usage of Thread API" -> null, + "Usage of ThreadGroup API" -> null, + "sun.misc.Unsafe sun.misc.Unsafe getUnsafe()" -> null, + "Unsafe - Alloc" -> null, + "Unsafe - Array" -> null, + "Unsafe - compareAndSwap" -> null, + "Unsafe - Class" -> null, + "Unsafe - Fence" -> null, + "Unsafe - Fetch & Add" -> null, + "Unsafe - Heap" -> null, + "Unsafe - Heap Get" -> null, + "Unsafe - Heap Put" -> null, + "Misc" -> null, + "Unsafe - Monitor" -> null, + "Unsafe - Off-Heap" -> null, + "Unsafe - Offset" -> null, + "Unsafe - Ordered Put" -> null, + "Unsafe - Park" -> null, + "Unsafe - Throw" -> null, + "Unsafe - Volatile Get" -> null, + "Unsafe - Volatile Put" -> null, + "java.sql.DriverManager getConnection" -> null, + "java.sql.Connection rollback" -> null, + "creation and execution of java.sql.Statement" -> null, + "creation and execution of java.sql.PreparedStatement" -> null, + "creation and execution of java.sql.CallableStatement" -> null, + "class file retransformation" -> null, + "instrumenting native methods" -> null, + "appending class loader search" -> null, + "retrieve classes information" -> null, + "(concrete) classes" -> null, + "abstract classes" -> null, + "annotations" -> null, + "enumerations" -> null, + "marker interfaces" -> null, + "simple functional interfaces (single abstract method (SAM) interface)" -> null, + "non-functional interface with default methods (Java >8)" -> null, + "non-functional interface with static methods (Java >8)" -> null, + "(standard) interface" -> null, + "module (Java >9)" -> null, + "Very Small Inheritance Tree" -> null, + "Small Inheritance Tree" -> null, + "Medium Inheritance Tree" -> null, + "Large Inheritance Tree" -> null, + "Very Large Inheritance Tree" -> null, + "Huge Inheritance Tree" -> null, + "Size of the Inheritance Tree Unknown" -> null, + "Class File JDK 1.1 (JDK 1.0.2)" -> null, + "Class File Java 5" -> null, + "Class File Java 6" -> null, + "Class File Java 7" -> null, + "Class File Java 8" -> null, + "Class File Java 9" -> null + ) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/JsonSupport.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/JsonSupport.scala new file mode 100644 index 0000000..4d8c771 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/JsonSupport.scala @@ -0,0 +1,6 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json._ + +trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 7a7c757..36dd6f0 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -1,11 +1,13 @@ package de.upb.cs.swt.delphi.webapi import akka.http.scaladsl.server.HttpApp +import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping +import spray.json._ /** * Web server configuration for Delphi web API. */ -object Server extends HttpApp { +object Server extends HttpApp with JsonSupport { override def routes = path("version") { version } ~ @@ -25,7 +27,7 @@ object Server extends HttpApp { private def features = { get { complete { - "features" + FeatureListMapping.featureList.toJson } } } From 4af0304c6d09bb2429cdbc003ede137e4ad92fbf Mon Sep 17 00:00:00 2001 From: almacken Date: Tue, 29 May 2018 12:33:16 -0600 Subject: [PATCH 2/7] Added retrieve endpoint --- .../cs/swt/delphi/webapi/ElasticClient.scala | 20 +++++++++++++++++++ .../de/upb/cs/swt/delphi/webapi/Server.scala | 4 +++- 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala new file mode 100644 index 0000000..8e12d13 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala @@ -0,0 +1,20 @@ +package de.upb.cs.swt.delphi.webapi + +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.HttpClient + +object ElasticClient { + + val configuration = new Configuration() + val client = HttpClient(configuration.elasticsearchClientUri) + val index = "delphi" / "project" + + //Returns an entry with the given ID as an option + def getSource(id: String) = + client.execute{ + get(id).from(index) + }.await match { + case Right(res) => res.body + case Left(_) => Option.empty + } +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 36dd6f0..16926b4 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -34,7 +34,9 @@ object Server extends HttpApp with JsonSupport { def retrieve(identifier: String) = { get { - complete(identifier) + complete( + ElasticClient.getSource(identifier) + ) } } From 769c2b66489a550b67e1a29ccde2c86f39836cfd Mon Sep 17 00:00:00 2001 From: almacken Date: Tue, 5 Jun 2018 16:33:33 -0600 Subject: [PATCH 3/7] Refactored Retrieve to use Actors --- .../cs/swt/delphi/webapi/ElasticActor.scala | 39 +++++++++++++++++++ .../delphi/webapi/ElasticActorManager.scala | 28 +++++++++++++ .../cs/swt/delphi/webapi/ElasticClient.scala | 20 ---------- .../de/upb/cs/swt/delphi/webapi/Server.scala | 14 ++++++- 4 files changed, 80 insertions(+), 21 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala new file mode 100644 index 0000000..1906a77 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -0,0 +1,39 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, ActorLogging, PoisonPill, Props, ReceiveTimeout} +import com.sksamuel.elastic4s.IndexAndType +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.HttpClient +import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource + +import scala.concurrent.duration._ + +class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{ + + val client = HttpClient(configuration.elasticsearchClientUri) + + override def preStart(): Unit = log.info("Search actor started") + override def postStop(): Unit = log.info("Search actor shut down") + context.setReceiveTimeout(2 seconds) + + override def receive = { + case GetSource(id, index) => { + log.info("Executing get on entry {}", id) + def source = client.execute{ + get(id).from(index) + }.await match { + case Right(res) => res.body.get + case Left(_) => Option.empty + } + sender().tell(source, context.self) + context.stop(self) + } + case ReceiveTimeout => context.stop(self) + } +} + +object ElasticActor{ + def props(configuration: Configuration) : Props = Props(new ElasticActor(configuration)) + + final case class GetSource(id: String, index: IndexAndType) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala new file mode 100644 index 0000000..5dbd562 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -0,0 +1,28 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, ActorLogging, Props} +import com.sksamuel.elastic4s.http.ElasticDsl._ +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve +import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource + +class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{ + + private val index = "delphi" / "project" + + override def preStart(): Unit = log.info("Actor manager started") + override def postStop(): Unit = log.info("Actor manager shut down") + + override def receive = { + case Retrieve(id) => { + log.info("Creating actor to search for entry {}", id) + val retrieveActor = context.actorOf(ElasticActor.props(configuration)) + retrieveActor forward GetSource(id, index) + } + } +} + +object ElasticActorManager{ + def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration)) + + final case class Retrieve(id: String) +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala deleted file mode 100644 index 8e12d13..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala +++ /dev/null @@ -1,20 +0,0 @@ -package de.upb.cs.swt.delphi.webapi - -import com.sksamuel.elastic4s.http.ElasticDsl._ -import com.sksamuel.elastic4s.http.HttpClient - -object ElasticClient { - - val configuration = new Configuration() - val client = HttpClient(configuration.elasticsearchClientUri) - val index = "delphi" / "project" - - //Returns an entry with the given ID as an option - def getSource(id: String) = - client.execute{ - get(id).from(index) - }.await match { - case Right(res) => res.body - case Left(_) => Option.empty - } -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 16926b4..a736dd3 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -1,7 +1,13 @@ package de.upb.cs.swt.delphi.webapi +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem import akka.http.scaladsl.server.HttpApp +import akka.pattern.ask +import akka.util.Timeout import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve import spray.json._ /** @@ -9,6 +15,11 @@ import spray.json._ */ object Server extends HttpApp with JsonSupport { + private val configuration = new Configuration() + private val system = ActorSystem("delphi-webapi") + private val actorManager = system.actorOf(ElasticActorManager.props(configuration)) + implicit val timeout = Timeout(5, TimeUnit.SECONDS) + override def routes = path("version") { version } ~ path("features") { features } ~ @@ -35,7 +46,7 @@ object Server extends HttpApp with JsonSupport { def retrieve(identifier: String) = { get { complete( - ElasticClient.getSource(identifier) + (actorManager ? Retrieve(identifier)).mapTo[String] ) } } @@ -51,6 +62,7 @@ object Server extends HttpApp with JsonSupport { def main(args: Array[String]): Unit = { val configuration = new Configuration() Server.startServer(configuration.bindHost, configuration.bindPort) + system.terminate() } From 3f69e794c1b6e37c08469fcac56d293a1684e5be Mon Sep 17 00:00:00 2001 From: almacken Date: Wed, 6 Jun 2018 15:15:29 -0600 Subject: [PATCH 4/7] Added Enqueue route, using a priority mailbox --- src/main/resources/application.conf | 9 +++++++++ .../upb/cs/swt/delphi/webapi/AppLogging.scala | 8 ++++++++ .../upb/cs/swt/delphi/webapi/ElasticActor.scala | 2 +- .../swt/delphi/webapi/ElasticActorManager.scala | 17 +++++++++++------ .../delphi/webapi/ElasticPriorityMailbox.scala | 14 ++++++++++++++ .../de/upb/cs/swt/delphi/webapi/Server.scala | 17 ++++++++++++++--- 6 files changed, 57 insertions(+), 10 deletions(-) create mode 100644 src/main/resources/application.conf create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..9ebf7f5 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,9 @@ +es-priority-mailbox { + mailbox-type = "de.upb.cs.swt.delphi.webapi.ElasticPriorityMailbox" +} + +akka.actor.deployment { + /espriomailboxactor { + mailbox = es-priority-mailbox + } +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala new file mode 100644 index 0000000..673d2f2 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala @@ -0,0 +1,8 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.event.{BusLogging, LoggingAdapter} + +trait AppLogging { + def log(implicit system: ActorSystem): LoggingAdapter = new BusLogging(system.eventStream, this.getClass.getName, this.getClass, system.asInstanceOf[ExtendedActorSystem].logFilter) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index 1906a77..a0792bf 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -1,6 +1,6 @@ package de.upb.cs.swt.delphi.webapi -import akka.actor.{Actor, ActorLogging, PoisonPill, Props, ReceiveTimeout} +import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout} import com.sksamuel.elastic4s.IndexAndType import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.HttpClient diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala index 5dbd562..b115d26 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -2,7 +2,7 @@ package de.upb.cs.swt.delphi.webapi import akka.actor.{Actor, ActorLogging, Props} import com.sksamuel.elastic4s.http.ElasticDsl._ -import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{ @@ -13,16 +13,21 @@ class ElasticActorManager(configuration: Configuration) extends Actor with Actor override def postStop(): Unit = log.info("Actor manager shut down") override def receive = { - case Retrieve(id) => { - log.info("Creating actor to search for entry {}", id) - val retrieveActor = context.actorOf(ElasticActor.props(configuration)) - retrieveActor forward GetSource(id, index) - } + case Retrieve(id) => getSource(id) + case Enqueue(id) => getSource(id) + } + + private def getSource(id: String) = { + log.info("Creating actor to search for entry {}", id) + val retrieveActor = context.actorOf(ElasticActor.props(configuration)) + retrieveActor forward GetSource(id, index) } } object ElasticActorManager{ def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration)) + .withMailbox("es-priority-mailbox") final case class Retrieve(id: String) + final case class Enqueue(id: String) } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala new file mode 100644 index 0000000..5600c01 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala @@ -0,0 +1,14 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.ActorSystem +import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox} +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import com.typesafe.config.Config + +class ElasticPriorityMailbox (settings: ActorSystem.Settings, config: Config) + extends UnboundedStablePriorityMailbox( + PriorityGenerator{ + case Retrieve(_) => 5 + case Enqueue(_) => 1 + case _ => 2 + }) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index a736dd3..06b1f76 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -7,13 +7,13 @@ import akka.http.scaladsl.server.HttpApp import akka.pattern.ask import akka.util.Timeout import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping -import de.upb.cs.swt.delphi.webapi.ElasticActorManager.Retrieve +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} import spray.json._ /** * Web server configuration for Delphi web API. */ -object Server extends HttpApp with JsonSupport { +object Server extends HttpApp with JsonSupport with AppLogging { private val configuration = new Configuration() private val system = ActorSystem("delphi-webapi") @@ -24,7 +24,8 @@ object Server extends HttpApp with JsonSupport { path("version") { version } ~ path("features") { features } ~ pathPrefix("search" / Remaining) { query => search(query) } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ + pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } private def version = { @@ -51,6 +52,16 @@ object Server extends HttpApp with JsonSupport { } } + def enqueue(identifier: String) = { + get { + pass { //TODO: Require authorization here + complete( + (actorManager ? Enqueue(identifier)).mapTo[String] + ) + } + } + } + def search(query: String) = { get { complete { From 39b03749cb51af3902214e1b093ebba73e22c1a8 Mon Sep 17 00:00:00 2001 From: almacken Date: Thu, 7 Jun 2018 14:46:31 -0600 Subject: [PATCH 5/7] Added Actor that limits request frequency by IP. This actor forwards requests to another actor, and blocks IPs for a certain amount of time if they make too many requests within a given window. It is not currently functioning, as there is not registration feature in this version, but it should work with a minor modification to the retrieve route. --- src/main/resources/application.conf | 8 ++ .../delphi/webapi/ElasticActorManager.scala | 4 +- .../cs/swt/delphi/webapi/ElasticMessage.scala | 3 + .../delphi/webapi/ElasticRequestLimiter.scala | 81 +++++++++++++++++++ .../de/upb/cs/swt/delphi/webapi/Server.scala | 14 +++- 5 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala create mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 9ebf7f5..4a97c1f 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -6,4 +6,12 @@ akka.actor.deployment { /espriomailboxactor { mailbox = es-priority-mailbox } +} + +akka { + http { + server { + remote-address-header = on + } + } } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala index b115d26..756f2f3 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -28,6 +28,6 @@ object ElasticActorManager{ def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration)) .withMailbox("es-priority-mailbox") - final case class Retrieve(id: String) - final case class Enqueue(id: String) + final case class Retrieve(id: String) extends ElasticMessage + final case class Enqueue(id: String) extends ElasticMessage } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala new file mode 100644 index 0000000..f6aae33 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala @@ -0,0 +1,3 @@ +package de.upb.cs.swt.delphi.webapi + +trait ElasticMessage diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala new file mode 100644 index 0000000..df9998b --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala @@ -0,0 +1,81 @@ +package de.upb.cs.swt.delphi.webapi + + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.Timers +import akka.http.scaladsl.model.RemoteAddress +import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._ + +import scala.concurrent.duration._ +import scala.collection.mutable + +//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given +// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time +class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers { + + private val window = 1 second + private val threshold = 10 + private val timeout = 2 hours + + private var recentIPs: mutable.Map[String, Int] = mutable.Map() + private var blockedIPs: mutable.Set[String] = mutable.Set() + + override def preStart(): Unit = { + log.info("Request limiter started") + timers.startPeriodicTimer(ClearTimer, ClearLogs, window) + } + override def postStop(): Unit = log.info("Request limiter shut down") + + override def receive = { + case Validate(rawIp, message) => { + val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown") + //First, reject IPs marked as blocked + if (blockedIPs.contains(ip)) { + rejectRequest() + } else { + //Check if this IP has made any requests recently + if (recentIPs.contains(ip)) { + //If so, increment their counter and test if they have exceeded the request threshold + recentIPs.update(ip, recentIPs(ip) + 1) + if (recentIPs(ip) > threshold) { + //If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period + blockedIPs += ip + log.info("Blocked IP {} due to exceeding request frequency threshold", ip) + timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout) + rejectRequest() + } else { + //Else, forward this message + nextActor forward message + } + } else { + //Else, register their request in the map and pass it to the next actor + recentIPs += (ip -> 1) + nextActor forward message + } + } + } + case ClearLogs => + recentIPs.clear() + case Forgive(ip) => { + blockedIPs -= ip + log.info("Forgave IP {} after timeout", ip) + } + } + + //Rejects requests from blocked IPs + private def rejectRequest() = + sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" + + "As a result, you have been timed out.\n" + + "Please wait a while or register an account with us to continue using this service." +} + +object ElasticRequestLimiter{ + def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor)) + + final case class Validate(rawIp: RemoteAddress, message: ElasticMessage) + final case object ClearLogs + final case class Forgive(ip: String) + + final case object ClearTimer + final case class ForgiveTimer(ip: String) +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 06b1f76..02e72ee 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -8,6 +8,7 @@ import akka.pattern.ask import akka.util.Timeout import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate import spray.json._ /** @@ -18,6 +19,7 @@ object Server extends HttpApp with JsonSupport with AppLogging { private val configuration = new Configuration() private val system = ActorSystem("delphi-webapi") private val actorManager = system.actorOf(ElasticActorManager.props(configuration)) + private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager)) implicit val timeout = Timeout(5, TimeUnit.SECONDS) override def routes = @@ -46,9 +48,15 @@ object Server extends HttpApp with JsonSupport with AppLogging { def retrieve(identifier: String) = { get { - complete( - (actorManager ? Retrieve(identifier)).mapTo[String] - ) + pass { //TODO: Require authentication here + complete( + (actorManager ? Retrieve(identifier)).mapTo[String] + ) + } ~ extractClientIP{ ip => + complete( + (requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String] + ) + } } } From c6c53dc1f12358310c2efc292f719c564ef82e30 Mon Sep 17 00:00:00 2001 From: almacken Date: Tue, 12 Jun 2018 14:54:28 -0600 Subject: [PATCH 6/7] Improved actor routing --- src/main/resources/application.conf | 11 ++++++++ .../cs/swt/delphi/webapi/Configuration.scala | 13 ++++++--- .../cs/swt/delphi/webapi/ElasticActor.scala | 6 ++--- .../delphi/webapi/ElasticActorManager.scala | 27 ++++++++++++++----- .../cs/swt/delphi/webapi/ElasticMessage.scala | 3 --- .../delphi/webapi/ElasticRequestLimiter.scala | 1 + 6 files changed, 46 insertions(+), 15 deletions(-) delete mode 100644 src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 4a97c1f..935d3b3 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -14,4 +14,15 @@ akka { remote-address-header = on } } +} + +# Use this dispatcher for actors that make blocking calls to the Elasticsearch database +elasticsearch-handling-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 4 + # This thread pool is intended for development purposes, and should be increased for production + } + throughput = 1 } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index f61f6b6..53ed334 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -1,13 +1,20 @@ package de.upb.cs.swt.delphi.webapi -import com.sksamuel.elastic4s.ElasticsearchClientUri +import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} +import com.sksamuel.elastic4s.http.ElasticDsl._ /** * @author Ben Hermann */ -class Configuration(val bindHost: String = "0.0.0.0", +class Configuration( //Server and Elasticsearch configuration + val bindHost: String = "0.0.0.0", val bindPort: Int = 8080, val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( - sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200"))) { + sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")), + val esProjectIndex: IndexAndType = "delphi" / "project", + + //Actor system configuration + val elasticActorPoolSize: Int = 8 + ) { } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index a0792bf..d79df4c 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -1,15 +1,17 @@ package de.upb.cs.swt.delphi.webapi -import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout} +import akka.actor.{Actor, ActorLogging, Props} import com.sksamuel.elastic4s.IndexAndType import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.HttpClient import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{ + implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher") val client = HttpClient(configuration.elasticsearchClientUri) override def preStart(): Unit = log.info("Search actor started") @@ -26,9 +28,7 @@ class ElasticActor(configuration: Configuration) extends Actor with ActorLogging case Left(_) => Option.empty } sender().tell(source, context.self) - context.stop(self) } - case ReceiveTimeout => context.stop(self) } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala index 756f2f3..77882b0 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -1,13 +1,21 @@ package de.upb.cs.swt.delphi.webapi -import akka.actor.{Actor, ActorLogging, Props} -import com.sksamuel.elastic4s.http.ElasticDsl._ +import akka.actor.{Actor, ActorLogging, Props, Terminated} +import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router} import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{ - private val index = "delphi" / "project" + private val index = configuration.esProjectIndex + private var elasticRouter = { + val routees = Vector.fill(configuration.elasticActorPoolSize) { + val r = context.actorOf(ElasticActor.props(configuration)) + context watch r + ActorRefRoutee(r) + } + Router(RoundRobinRoutingLogic(), routees) + } override def preStart(): Unit = log.info("Actor manager started") override def postStop(): Unit = log.info("Actor manager shut down") @@ -15,12 +23,17 @@ class ElasticActorManager(configuration: Configuration) extends Actor with Actor override def receive = { case Retrieve(id) => getSource(id) case Enqueue(id) => getSource(id) + case Terminated(id) => { + elasticRouter.removeRoutee(id) + val r = context.actorOf(ElasticActor.props(configuration)) + context watch r + elasticRouter = elasticRouter.addRoutee(r) + } } private def getSource(id: String) = { - log.info("Creating actor to search for entry {}", id) - val retrieveActor = context.actorOf(ElasticActor.props(configuration)) - retrieveActor forward GetSource(id, index) + log.info("Forwarding search for entry {}", id) + elasticRouter.route(GetSource(id, index), sender()) } } @@ -28,6 +41,8 @@ object ElasticActorManager{ def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration)) .withMailbox("es-priority-mailbox") + sealed trait ElasticMessage + final case class Retrieve(id: String) extends ElasticMessage final case class Enqueue(id: String) extends ElasticMessage } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala deleted file mode 100644 index f6aae33..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala +++ /dev/null @@ -1,3 +0,0 @@ -package de.upb.cs.swt.delphi.webapi - -trait ElasticMessage diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala index df9998b..4395630 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala @@ -5,6 +5,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.actor.Timers import akka.http.scaladsl.model.RemoteAddress import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._ +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage import scala.concurrent.duration._ import scala.collection.mutable From b933a800ccd8e4d09f09a84d53ee2928970952b1 Mon Sep 17 00:00:00 2001 From: almacken Date: Wed, 13 Jun 2018 14:41:21 -0600 Subject: [PATCH 7/7] Improved priority handling --- .../cs/swt/delphi/webapi/ElasticActor.scala | 30 ++++++++++--------- .../delphi/webapi/ElasticActorManager.scala | 18 +++++------ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala index d79df4c..afe76fe 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -4,12 +4,12 @@ import akka.actor.{Actor, ActorLogging, Props} import com.sksamuel.elastic4s.IndexAndType import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.HttpClient -import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{ +class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{ implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher") val client = HttpClient(configuration.elasticsearchClientUri) @@ -19,21 +19,23 @@ class ElasticActor(configuration: Configuration) extends Actor with ActorLogging context.setReceiveTimeout(2 seconds) override def receive = { - case GetSource(id, index) => { - log.info("Executing get on entry {}", id) - def source = client.execute{ - get(id).from(index) - }.await match { - case Right(res) => res.body.get - case Left(_) => Option.empty - } - sender().tell(source, context.self) + case Enqueue(id) => getSource(id) + case Retrieve(id) => getSource(id) + } + + private def getSource(id: String) = { + log.info("Executing get on entry {}", id) + def source = client.execute{ + get(id).from(index) + }.await match { + case Right(res) => res.body.get + case Left(_) => Option.empty } + sender().tell(source, context.self) } } object ElasticActor{ - def props(configuration: Configuration) : Props = Props(new ElasticActor(configuration)) - - final case class GetSource(id: String, index: IndexAndType) + def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index)) + .withMailbox("es-priority-mailbox") } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala index 77882b0..aa8f6fc 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -2,15 +2,14 @@ package de.upb.cs.swt.delphi.webapi import akka.actor.{Actor, ActorLogging, Props, Terminated} import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router} -import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} -import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{ private val index = configuration.esProjectIndex private var elasticRouter = { val routees = Vector.fill(configuration.elasticActorPoolSize) { - val r = context.actorOf(ElasticActor.props(configuration)) + val r = context.actorOf(ElasticActor.props(configuration, index)) context watch r ActorRefRoutee(r) } @@ -21,20 +20,17 @@ class ElasticActorManager(configuration: Configuration) extends Actor with Actor override def postStop(): Unit = log.info("Actor manager shut down") override def receive = { - case Retrieve(id) => getSource(id) - case Enqueue(id) => getSource(id) + case em: ElasticMessage => { + log.info("Forwarding request {} to ElasticActor", em) + elasticRouter.route(em, sender()) + } case Terminated(id) => { elasticRouter.removeRoutee(id) - val r = context.actorOf(ElasticActor.props(configuration)) + val r = context.actorOf(ElasticActor.props(configuration, index)) context watch r elasticRouter = elasticRouter.addRoutee(r) } } - - private def getSource(id: String) = { - log.info("Forwarding search for entry {}", id) - elasticRouter.route(GetSource(id, index), sender()) - } } object ElasticActorManager{