Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % "10.1.3"
"com.typesafe.akka" %% "akka-http" % "10.1.4"
)

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime
Expand Down Expand Up @@ -93,6 +93,8 @@ libraryDependencies ++= Seq(
libraryDependencies += "org.apache.maven.indexer" % "indexer-reader" % "6.0.0"
libraryDependencies += "org.apache.maven.indexer" % "indexer-core" % "6.0.0"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"

// Pinning secure versions of insecure transitive libraryDependencies
// Please update when updating dependencies above (including Play plugin)
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class Configuration {
val limit : Int = 50
val throttle : Throttle = Throttle(5, 30 second, 5, ThrottleMode.shaping)

val tempFileStorage : String = "temp/"

val elasticActorPoolSize : Int = 4
val callGraphStreamPoolSize : Int = 4

case class Throttle(element : Int, per : FiniteDuration, maxBurst : Int, mode : ThrottleMode)
}

2 changes: 1 addition & 1 deletion src/main/scala/de/upb/cs/swt/delphi/crawler/Crawler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package de.upb.cs.swt.delphi.crawler

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.sksamuel.elastic4s.http.ElasticClient
import de.upb.cs.swt.delphi.crawler.control.Server
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor.Start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package de.upb.cs.swt.delphi.crawler

import akka.actor.{ActorRef, ActorSystem}
import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.sksamuel.elastic4s.http.ElasticClient
import de.upb.cs.swt.delphi.crawler.storage.ElasticActor

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package de.upb.cs.swt.delphi.crawler.discovery.maven

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.stream.ActorMaterializer
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.sksamuel.elastic4s.http.ElasticClient
import de.upb.cs.swt.delphi.crawler.Configuration
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenCrawlActor.Start
import de.upb.cs.swt.delphi.crawler.storage.ArtifactExistsQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ case class MavenIdentifier(val repository: String, val groupId: String, val arti
repository + ":" + groupId + ":" + artifactId + ":" + version
}

override val toString: String = groupId + ":" + artifactId + ":" + version

def toJarLocation : URI = {
constructArtifactBaseUri().resolve(artifactId + "-" + version + ".jar")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package de.upb.cs.swt.delphi.crawler.preprocessing

import java.io.BufferedInputStream
import java.net.URI
import java.net.{URI, URL}

import de.upb.cs.swt.delphi.crawler.discovery.maven.{HttpResourceHandler, MavenIdentifier}

class MavenDownloader(identifier: MavenIdentifier) {
val http = new HttpResourceHandler(constructArtifactBaseUri())
val pomResource = http.locate(pomFilename(identifier))
val jarResource = http.locate(jarFilename(identifier))
val metaResource = http.locate("maven-metadata.xml")

/**
* Construct url from maven identifier
Expand All @@ -34,20 +35,26 @@ class MavenDownloader(identifier: MavenIdentifier) {
new URI(identifier.repository)
.resolve(identifier.groupId.replace('.', '/') + "/")
.resolve(identifier.artifactId + "/")
.resolve(identifier.version + "/")
// .resolve(identifier.version + "/")

def constructArtifactUrl(): URL =
constructArtifactBaseUri().resolve(jarFilename(identifier)).toURL

def pomFilename(identifier: MavenIdentifier): String =
identifier.artifactId + "-" + identifier.version + ".pom"
identifier.version + "/" + identifier.artifactId + "-" + identifier.version + ".pom"

def jarFilename(identifier: MavenIdentifier): String =
identifier.artifactId + "-" + identifier.version + ".jar"
identifier.version + "/" + identifier.artifactId + "-" + identifier.version + ".jar"

def downloadJar(): JarFile = {
JarFile(new BufferedInputStream(jarResource.read()))
JarFile(jarResource.read(), constructArtifactUrl())
}

def downloadPom(): PomFile= {
PomFile(new BufferedInputStream(pomResource.read()))
def downloadPom(): PomFile = {
PomFile(pomResource.read())
}

def downloadMeta(): MetaFile = {
MetaFile(metaResource.read())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@
package de.upb.cs.swt.delphi.crawler.preprocessing

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.util.Timeout
import akka.pattern.ask
import akka.routing.{BalancingPool, RoundRobinPool}
import akka.util.Timeout
import com.sksamuel.elastic4s.http.ElasticClient
import de.upb.cs.swt.delphi.crawler.Configuration
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.processing.CallGraphStream
import de.upb.cs.swt.delphi.crawler.storage.ElasticActor

import scala.concurrent.duration._
import scala.util.{Success, Try}
import scala.util.Success

class PreprocessingDispatchActor(configuration : Configuration, nextStep : ActorRef, elasticActor : ActorRef) extends Actor with ActorLogging {

val elasticPool = context.actorOf(RoundRobinPool(configuration.elasticActorPoolSize)
.props(ElasticActor.props(ElasticClient(configuration.elasticsearchClientUri))))
val callGraphPool = context.actorOf(BalancingPool(configuration.callGraphStreamPoolSize)
.props(CallGraphStream.props(configuration)))

override def receive: Receive = {
case m : MavenIdentifier => {

implicit val ec = context.dispatcher

// Start creation of base record
elasticActor forward m
elasticPool forward m

// Create call graphs for each project
callGraphPool ! m

// Transform maven identifier into maven artifact
implicit val timeout = Timeout(5.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package de.upb.cs.swt.delphi.crawler

import java.io.InputStream
import java.net.URL


/**
Expand All @@ -28,12 +29,18 @@ package object preprocessing {
* Used for identifcation (Pattern matching) of jar file
* @param is jar file stream
*/
case class JarFile(is: InputStream)
case class JarFile(is: InputStream, url: URL)

/**
* Used for identification (Pattern matching) of pom file
* @param is pom file stream
*/
case class PomFile(is: InputStream)

/**
* Used for identification (Pattern matching) of metadata file
* @param is metadata file stream
*/
case class MetaFile(is: InputStream)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package de.upb.cs.swt.delphi.crawler.processing


import java.io.FileNotFoundException

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.ask
import akka.stream._
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import akka.util.Timeout
import akka.{Done, NotUsed}
import com.sksamuel.elastic4s.http.ElasticClient
import de.upb.cs.swt.delphi.crawler.Configuration
import de.upb.cs.swt.delphi.crawler.discovery.maven.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.preprocessing.{JarFile, MavenDownloader, PomFile}
import de.upb.cs.swt.delphi.crawler.processing.CallGraphStream.MappedEdge
import de.upb.cs.swt.delphi.crawler.storage.ElasticCallGraphActor
import org.opalj.ai.analyses.cg.UnresolvedMethodCall

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

/*
* This component takes a Maven identifier, determines what methods it calls in each of it's dependencies, and
* stores each method called in each dependency in the Elasticsearch database.
*
* To do this, it delegates tasks to five actors, listed below. Check them for more implementation details.
*
* MavenDependencyActor and OpalActor find the dependencies and external calls for a project respectively,
* ElasticEdgeSearchActor and MavenEdgeMappingActor connect them together, and ElasticCallGraphActor adds
* all this information to the database.
*
* Note that to match unresolved method calls to dependencies, ElasticEdgeSearchActor is used first, and
* MavenEdgeMappingActor is used to match any calls ElasticEdgeSearchActor missed.
*/

class CallGraphStream(configuration: Configuration) extends Actor with ActorLogging {

override def receive: Receive = {
case m: MavenIdentifier =>
graphActor forward m
}

implicit val timeout: Timeout = 60 seconds
val decider: Supervision.Decider = {
case e: Exception => {log.warning("Call graph stream threw exception " + e); Supervision.Resume}
}
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(context.system).withSupervisionStrategy(decider))
implicit val parallelism = 1
implicit val ec = ExecutionContext.global

val esClient = ElasticClient(configuration.elasticsearchClientUri)

val mavenDependencyActor = context.actorOf(MavenDependencyActor.props(configuration))
val opalActor: ActorRef = context.actorOf(OpalActor.props(configuration))
val mavenEdgeMapActor: ActorRef = context.actorOf(MavenEdgeMappingActor.props(configuration))
val esEdgeSearchActor: ActorRef = context.actorOf(ElasticEdgeSearchActor.props(esClient))
val esPushActor: ActorRef = context.actorOf(ElasticCallGraphActor.props(esClient))

val fileGenFlow: Flow[MavenIdentifier, (PomFile, JarFile, MavenIdentifier), NotUsed] = Flow.fromFunction(fetchFiles)
val edgeSetFlow: Flow[(Set[MavenIdentifier], JarFile, MavenIdentifier), (Set[UnresolvedMethodCall], Set[MavenIdentifier], MavenIdentifier), NotUsed] =
Flow[(Set[MavenIdentifier], JarFile, MavenIdentifier)].mapAsync(parallelism){ case (ix: Set[MavenIdentifier], jf: JarFile, i: MavenIdentifier) =>
(opalActor ? jf).mapTo[Set[UnresolvedMethodCall]].map(cx => (cx, ix, i))}
val dependencyConverter: Flow[(PomFile, JarFile, MavenIdentifier), (Set[MavenIdentifier], JarFile, MavenIdentifier), NotUsed] =
Flow[(PomFile, JarFile, MavenIdentifier)].mapAsync(parallelism){ case (pf, jf, id) => (mavenDependencyActor ? pf).mapTo[Set[MavenIdentifier]].map((_, jf, id))}

val esEdgeMatcher: Flow[(Set[UnresolvedMethodCall], Set[MavenIdentifier], MavenIdentifier), (Set[UnresolvedMethodCall], Set[MavenIdentifier], Set[MappedEdge], MavenIdentifier), NotUsed] =
Flow[(Set[UnresolvedMethodCall], Set[MavenIdentifier], MavenIdentifier)].mapAsync(parallelism){ case (mxIn, ixIn, i) => (esEdgeSearchActor ? (mxIn, ixIn))
.mapTo[(Set[UnresolvedMethodCall], Set[MavenIdentifier], Set[MappedEdge])].map{case (mxOut, ixOut, ex) => (mxOut, ixOut, ex, i)}}
val mavenEdgeMatcher: Flow[(Set[UnresolvedMethodCall], Set[MavenIdentifier], Set[MappedEdge], MavenIdentifier), (Set[MappedEdge], MavenIdentifier), NotUsed] =
Flow[(Set[UnresolvedMethodCall], Set[MavenIdentifier], Set[MappedEdge], MavenIdentifier)].mapAsync(parallelism){ case (mx, ix, ex, i)
=> (mavenEdgeMapActor ? (mx, ix)).mapTo[Set[MappedEdge]].map(me => (me ++ ex, i))}

val esPusherSink: Sink[(Set[MappedEdge], MavenIdentifier), Future[Done]] =
Sink.foreach{ case (ex, i) => (esPushActor ! (i, ex))}

val edgeGeneratingGraph: Flow[MavenIdentifier, (Set[UnresolvedMethodCall], Set[MavenIdentifier], MavenIdentifier), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>

val fileGen = b.add(fileGenFlow)
val edgeGen = b.add(edgeSetFlow)

fileGen ~> dependencyConverter.filter{case (ix, jf, i) =>
if(ix.isEmpty) { log.info(i.toString + " not mapped, incomplete POM file."); false }
else { log.info(i.toString + " mapped, valid POM file."); true }} ~> edgeGen.in

FlowShape(fileGen.in, edgeGen.out)
})

val edgeMatchingGraph: Flow[(Set[UnresolvedMethodCall], Set[MavenIdentifier], MavenIdentifier), (Set[MappedEdge], MavenIdentifier), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
val esMatcher = b.add(esEdgeMatcher)
val mvMatcher = b.add(mavenEdgeMatcher)

esMatcher ~> mvMatcher

FlowShape(esMatcher.in, mvMatcher.out)
})

val edgeMappingGraph: Sink[MavenIdentifier, NotUsed] = Sink.fromGraph(GraphDSL.create() { implicit b =>
val edgeGenerator = b.add(edgeGeneratingGraph)

edgeGenerator ~> edgeMatchingGraph ~> esPusherSink

SinkShape(edgeGenerator.in)
})

def fetchFiles(mavenIdentifier: MavenIdentifier): (PomFile, JarFile, MavenIdentifier) = {
val downloader = new MavenDownloader(mavenIdentifier)
def files = Try((downloader.downloadPom(), downloader.downloadJar(), mavenIdentifier))
files match {
case util.Success(f) => f
case util.Failure(e: FileNotFoundException) => {
log.info("{} not mapped, missing POM file", mavenIdentifier.toString)
throw e
}
}
}

val actorSource: Source[MavenIdentifier, ActorRef] = Source.actorRef(5000, OverflowStrategy.dropNew) //We may need to adjust this

val callGraphGraph: RunnableGraph[ActorRef] = actorSource.toMat(edgeMappingGraph)(Keep.left)

val graphActor: ActorRef = callGraphGraph.run()
}

object CallGraphStream{
def props(configuration: Configuration) = Props(new CallGraphStream(configuration))

case class MappedEdge(library: MavenIdentifier, method: String) //I'm not sure if this is the best place to put these
def unresMCtoStr(m: UnresolvedMethodCall): String = m.calleeClass.toJava + ": " + m.calleeDescriptor.toJava(m.calleeName)
}
Loading