@@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi
4242import org .apache .spark .broadcast .Broadcast
4343import org .apache .spark .deploy .{LocalSparkCluster , SparkHadoopUtil }
4444import org .apache .spark .deploy .StandaloneResourceUtils ._
45- import org .apache .spark .executor .ExecutorMetrics
45+ import org .apache .spark .executor .{ ExecutorMetrics , ExecutorMetricsSource }
4646import org .apache .spark .input .{FixedLengthBinaryInputFormat , PortableDataStream , StreamInputFormat , WholeTextFileInputFormat }
4747import org .apache .spark .internal .Logging
4848import org .apache .spark .internal .config ._
@@ -551,9 +551,16 @@ class SparkContext(config: SparkConf) extends Logging {
551551 _dagScheduler = new DAGScheduler (this )
552552 _heartbeatReceiver.ask[Boolean ](TaskSchedulerIsSet )
553553
554+ val _executorMetricsSource =
555+ if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED )) {
556+ Some (new ExecutorMetricsSource )
557+ } else {
558+ None
559+ }
560+
554561 // create and start the heartbeater for collecting memory metrics
555562 _heartbeater = new Heartbeater (
556- () => SparkContext .this .reportHeartBeat(),
563+ () => SparkContext .this .reportHeartBeat(_executorMetricsSource ),
557564 " driver-heartbeater" ,
558565 conf.get(EXECUTOR_HEARTBEAT_INTERVAL ))
559566 _heartbeater.start()
@@ -622,6 +629,7 @@ class SparkContext(config: SparkConf) extends Logging {
622629 _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
623630 _env.metricsSystem.registerSource(new BlockManagerSource (_env.blockManager))
624631 _env.metricsSystem.registerSource(new JVMCPUSource ())
632+ _executorMetricsSource.foreach(_.register(_env.metricsSystem))
625633 _executorAllocationManager.foreach { e =>
626634 _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
627635 }
@@ -1525,17 +1533,17 @@ class SparkContext(config: SparkConf) extends Logging {
15251533 */
15261534 def addFile (path : String , recursive : Boolean ): Unit = {
15271535 val uri = new Path (path).toUri
1528- val schemeCorrectedPath = uri.getScheme match {
1529- case null => new File (path).getCanonicalFile.toURI.toString
1536+ val schemeCorrectedURI = uri.getScheme match {
1537+ case null => new File (path).getCanonicalFile.toURI
15301538 case " local" =>
15311539 logWarning(" File with 'local' scheme is not supported to add to file server, since " +
15321540 " it is already available on every node." )
15331541 return
1534- case _ => path
1542+ case _ => uri
15351543 }
15361544
1537- val hadoopPath = new Path (schemeCorrectedPath )
1538- val scheme = new URI (schemeCorrectedPath) .getScheme
1545+ val hadoopPath = new Path (schemeCorrectedURI )
1546+ val scheme = schemeCorrectedURI .getScheme
15391547 if (! Array (" http" , " https" , " ftp" ).contains(scheme)) {
15401548 val fs = hadoopPath.getFileSystem(hadoopConfiguration)
15411549 val isDir = fs.getFileStatus(hadoopPath).isDirectory
@@ -1555,7 +1563,11 @@ class SparkContext(config: SparkConf) extends Logging {
15551563 val key = if (! isLocal && scheme == " file" ) {
15561564 env.rpcEnv.fileServer.addFile(new File (uri.getPath))
15571565 } else {
1558- schemeCorrectedPath
1566+ if (uri.getScheme == null ) {
1567+ schemeCorrectedURI.toString
1568+ } else {
1569+ path
1570+ }
15591571 }
15601572 val timestamp = System .currentTimeMillis
15611573 if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
@@ -1848,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
18481860
18491861 def checkRemoteJarFile (path : String ): String = {
18501862 val hadoopPath = new Path (path)
1851- val scheme = new URI (path) .getScheme
1863+ val scheme = hadoopPath.toUri .getScheme
18521864 if (! Array (" http" , " https" , " ftp" ).contains(scheme)) {
18531865 try {
18541866 val fs = hadoopPath.getFileSystem(hadoopConfiguration)
@@ -1870,21 +1882,21 @@ class SparkContext(config: SparkConf) extends Logging {
18701882 }
18711883 }
18721884
1873- if (path == null ) {
1874- logWarning(" null specified as parameter to addJar" )
1885+ if (path == null || path.isEmpty ) {
1886+ logWarning(" null or empty path specified as parameter to addJar" )
18751887 } else {
18761888 val key = if (path.contains(" \\ " )) {
18771889 // For local paths with backslashes on Windows, URI throws an exception
18781890 addLocalJarFile(new File (path))
18791891 } else {
1880- val uri = new URI (path)
1892+ val uri = new Path (path).toUri
18811893 // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
18821894 Utils .validateURL(uri)
18831895 uri.getScheme match {
18841896 // A JAR file which exists only on the driver node
18851897 case null =>
18861898 // SPARK-22585 path without schema is not url encoded
1887- addLocalJarFile(new File (uri.getRawPath ))
1899+ addLocalJarFile(new File (uri.getPath ))
18881900 // A JAR file which exists only on the driver node
18891901 case " file" => addLocalJarFile(new File (uri.getPath))
18901902 // A JAR file which exists locally on every worker node
@@ -2473,8 +2485,10 @@ class SparkContext(config: SparkConf) extends Logging {
24732485 }
24742486
24752487 /** Reports heartbeat metrics for the driver. */
2476- private def reportHeartBeat (): Unit = {
2488+ private def reportHeartBeat (executorMetricsSource : Option [ ExecutorMetricsSource ] ): Unit = {
24772489 val currentMetrics = ExecutorMetrics .getCurrentMetrics(env.memoryManager)
2490+ executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics))
2491+
24782492 val driverUpdates = new HashMap [(Int , Int ), ExecutorMetrics ]
24792493 // In the driver, we do not track per-stage metrics, so use a dummy stage for the key
24802494 driverUpdates.put(EventLoggingListener .DRIVER_STAGE_KEY , new ExecutorMetrics (currentMetrics))
0 commit comments