@@ -20,33 +20,42 @@ package org.apache.spark
2020import scala .language .implicitConversions
2121
2222import java .io ._
23+ import java .lang .reflect .Constructor
2324import java .net .URI
2425import java .util .{Arrays , Properties , UUID }
2526import java .util .concurrent .atomic .AtomicInteger
2627import java .util .UUID .randomUUID
28+
2729import scala .collection .{Map , Set }
2830import scala .collection .JavaConversions ._
2931import scala .collection .generic .Growable
3032import scala .collection .mutable .HashMap
3133import scala .reflect .{ClassTag , classTag }
34+
35+ import akka .actor .Props
36+
3237import org .apache .hadoop .conf .Configuration
3338import org .apache .hadoop .fs .Path
34- import org .apache .hadoop .io .{ArrayWritable , BooleanWritable , BytesWritable , DoubleWritable , FloatWritable , IntWritable , LongWritable , NullWritable , Text , Writable }
35- import org .apache .hadoop .mapred .{FileInputFormat , InputFormat , JobConf , SequenceFileInputFormat , TextInputFormat }
39+ import org .apache .hadoop .io .{ArrayWritable , BooleanWritable , BytesWritable , DoubleWritable ,
40+ FloatWritable , IntWritable , LongWritable , NullWritable , Text , Writable }
41+ import org .apache .hadoop .mapred .{FileInputFormat , InputFormat , JobConf , SequenceFileInputFormat ,
42+ TextInputFormat }
3643import org .apache .hadoop .mapreduce .{InputFormat => NewInputFormat , Job => NewHadoopJob }
3744import org .apache .hadoop .mapreduce .lib .input .{FileInputFormat => NewFileInputFormat }
45+
3846import org .apache .mesos .MesosNativeLibrary
39- import akka .actor .Props
4047
4148import org .apache .spark .annotation .{DeveloperApi , Experimental }
4249import org .apache .spark .broadcast .Broadcast
4350import org .apache .spark .deploy .{LocalSparkCluster , SparkHadoopUtil }
4451import org .apache .spark .executor .TriggerThreadDump
45- import org .apache .spark .input .{StreamInputFormat , PortableDataStream , WholeTextFileInputFormat , FixedLengthBinaryInputFormat }
52+ import org .apache .spark .input .{StreamInputFormat , PortableDataStream , WholeTextFileInputFormat ,
53+ FixedLengthBinaryInputFormat }
4654import org .apache .spark .partial .{ApproximateEvaluator , PartialResult }
4755import org .apache .spark .rdd ._
4856import org .apache .spark .scheduler ._
49- import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend , SparkDeploySchedulerBackend , SimrSchedulerBackend }
57+ import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend ,
58+ SparkDeploySchedulerBackend , SimrSchedulerBackend }
5059import org .apache .spark .scheduler .cluster .mesos .{CoarseMesosSchedulerBackend , MesosSchedulerBackend }
5160import org .apache .spark .scheduler .local .LocalBackend
5261import org .apache .spark .storage ._
@@ -387,9 +396,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
387396 }
388397 executorAllocationManager.foreach(_.start())
389398
390- // At this point, all relevant SparkListeners have been registered, so begin releasing events
391- listenerBus.start()
392-
393399 private [spark] val cleaner : Option [ContextCleaner ] = {
394400 if (conf.getBoolean(" spark.cleaner.referenceTracking" , true )) {
395401 Some (new ContextCleaner (this ))
@@ -399,6 +405,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
399405 }
400406 cleaner.foreach(_.start())
401407
408+ setupAndStartListenerBus()
402409 postEnvironmentUpdate()
403410 postApplicationStart()
404411
@@ -1017,12 +1024,48 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10171024 * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
10181025 * use `SparkFiles.get(fileName)` to find its download location.
10191026 */
1020- def addFile (path : String ) {
1027+ def addFile (path : String ): Unit = {
1028+ addFile(path, false )
1029+ }
1030+
1031+ /**
1032+ * Add a file to be downloaded with this Spark job on every node.
1033+ * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
1034+ * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
1035+ * use `SparkFiles.get(fileName)` to find its download location.
1036+ *
1037+ * A directory can be given if the recursive option is set to true. Currently directories are only
1038+ * supported for Hadoop-supported filesystems.
1039+ */
1040+ def addFile (path : String , recursive : Boolean ): Unit = {
10211041 val uri = new URI (path)
1022- val key = uri.getScheme match {
1023- case null | " file" => env.httpFileServer.addFile(new File (uri.getPath))
1024- case " local" => " file:" + uri.getPath
1025- case _ => path
1042+ val schemeCorrectedPath = uri.getScheme match {
1043+ case null | " local" => " file:" + uri.getPath
1044+ case _ => path
1045+ }
1046+
1047+ val hadoopPath = new Path (schemeCorrectedPath)
1048+ val scheme = new URI (schemeCorrectedPath).getScheme
1049+ if (! Array (" http" , " https" , " ftp" ).contains(scheme)) {
1050+ val fs = hadoopPath.getFileSystem(hadoopConfiguration)
1051+ if (! fs.exists(hadoopPath)) {
1052+ throw new FileNotFoundException (s " Added file $hadoopPath does not exist. " )
1053+ }
1054+ val isDir = fs.isDirectory(hadoopPath)
1055+ if (! isLocal && scheme == " file" && isDir) {
1056+ throw new SparkException (s " addFile does not support local directories when not running " +
1057+ " local mode." )
1058+ }
1059+ if (! recursive && isDir) {
1060+ throw new SparkException (s " Added file $hadoopPath is a directory and recursive is not " +
1061+ " turned on." )
1062+ }
1063+ }
1064+
1065+ val key = if (! isLocal && scheme == " file" ) {
1066+ env.httpFileServer.addFile(new File (uri.getPath))
1067+ } else {
1068+ schemeCorrectedPath
10261069 }
10271070 val timestamp = System .currentTimeMillis
10281071 addedFiles(key) = timestamp
@@ -1563,6 +1606,58 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15631606 /** Register a new RDD, returning its RDD ID */
15641607 private [spark] def newRddId (): Int = nextRddId.getAndIncrement()
15651608
1609+ /**
1610+ * Registers listeners specified in spark.extraListeners, then starts the listener bus.
1611+ * This should be called after all internal listeners have been registered with the listener bus
1612+ * (e.g. after the web UI and event logging listeners have been registered).
1613+ */
1614+ private def setupAndStartListenerBus (): Unit = {
1615+ // Use reflection to instantiate listeners specified via `spark.extraListeners`
1616+ try {
1617+ val listenerClassNames : Seq [String ] =
1618+ conf.get(" spark.extraListeners" , " " ).split(',' ).map(_.trim).filter(_ != " " )
1619+ for (className <- listenerClassNames) {
1620+ // Use reflection to find the right constructor
1621+ val constructors = {
1622+ val listenerClass = Class .forName(className)
1623+ listenerClass.getConstructors.asInstanceOf [Array [Constructor [_ <: SparkListener ]]]
1624+ }
1625+ val constructorTakingSparkConf = constructors.find { c =>
1626+ c.getParameterTypes.sameElements(Array (classOf [SparkConf ]))
1627+ }
1628+ lazy val zeroArgumentConstructor = constructors.find { c =>
1629+ c.getParameterTypes.isEmpty
1630+ }
1631+ val listener : SparkListener = {
1632+ if (constructorTakingSparkConf.isDefined) {
1633+ constructorTakingSparkConf.get.newInstance(conf)
1634+ } else if (zeroArgumentConstructor.isDefined) {
1635+ zeroArgumentConstructor.get.newInstance()
1636+ } else {
1637+ throw new SparkException (
1638+ s " $className did not have a zero-argument constructor or a " +
1639+ " single-argument constructor that accepts SparkConf. Note: if the class is" +
1640+ " defined inside of another Scala class, then its constructors may accept an" +
1641+ " implicit parameter that references the enclosing class; in this case, you must" +
1642+ " define the listener as a top-level class in order to prevent this extra" +
1643+ " parameter from breaking Spark's ability to find a valid constructor." )
1644+ }
1645+ }
1646+ listenerBus.addListener(listener)
1647+ logInfo(s " Registered listener $className" )
1648+ }
1649+ } catch {
1650+ case e : Exception =>
1651+ try {
1652+ stop()
1653+ } finally {
1654+ throw new SparkException (s " Exception when registering SparkListener " , e)
1655+ }
1656+ }
1657+
1658+ listenerBus.start()
1659+ }
1660+
15661661 /** Post the application start event */
15671662 private def postApplicationStart () {
15681663 // Note: this code assumes that the task scheduler has been initialized and has contacted
@@ -1582,8 +1677,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15821677 val schedulingMode = getSchedulingMode.toString
15831678 val addedJarPaths = addedJars.keys.toSeq
15841679 val addedFilePaths = addedFiles.keys.toSeq
1585- val environmentDetails =
1586- SparkEnv .environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
1680+ val environmentDetails = SparkEnv .environmentDetails(conf, schedulingMode, addedJarPaths,
1681+ addedFilePaths)
15871682 val environmentUpdate = SparkListenerEnvironmentUpdate (environmentDetails)
15881683 listenerBus.post(environmentUpdate)
15891684 }
0 commit comments