")
.replace(/"/g, "\"");
var g = graphlibDot.read(escaped_dot);
- g.graph().rankSep = rankSep;
var renderer = new dagreD3.render();
+ preprocessGraphLayout(g, forJob);
renderer(container, g);
}
@@ -251,50 +249,38 @@ function graphContainer() { return d3.select("#dag-viz-graph"); }
function metadataContainer() { return d3.select("#dag-viz-metadata"); }
/*
- * Helper function to create draw a label for each cluster.
- *
- * We need to do this manually because dagre-d3 does not support labeling clusters.
- * In general, the clustering support for dagre-d3 is quite limited at this point.
+ * Helper function to pre-process the graph layout.
+ * This step is necessary for certain styles that affect the positioning
+ * and sizes of graph elements, e.g. padding, font style, shape.
*/
-function drawClusterLabels(svgContainer, forJob) {
- var clusterLabelSize, stageClusterLabelSize;
+function preprocessGraphLayout(g, forJob) {
+ var nodes = g.nodes();
+ for (var i = 0; i < nodes.length; i++) {
+ var isCluster = g.children(nodes[i]).length > 0;
+ if (!isCluster) {
+ var node = g.node(nodes[i]);
+ if (forJob) {
+ // Do not display RDD name on job page
+ node.shape = "circle";
+ node.labelStyle = "font-size: 0px";
+ } else {
+ node.labelStyle = "font-size: 12px";
+ }
+ node.padding = "5";
+ }
+ }
+ // Curve the edges
+ var edges = g.edges();
+ for (var j = 0; j < edges.length; j++) {
+ var edge = g.edge(edges[j]);
+ edge.lineInterpolate = "basis";
+ }
+ // Adjust vertical separation between nodes
if (forJob) {
- clusterLabelSize = JobPageVizConstants.clusterLabelSize;
- stageClusterLabelSize = JobPageVizConstants.stageClusterLabelSize;
+ g.graph().rankSep = JobPageVizConstants.rankSep;
} else {
- clusterLabelSize = StagePageVizConstants.clusterLabelSize;
- stageClusterLabelSize = StagePageVizConstants.stageClusterLabelSize;
+ g.graph().rankSep = StagePageVizConstants.rankSep;
}
- svgContainer.selectAll("g.cluster").each(function() {
- var cluster = d3.select(this);
- var isStage = cluster.attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
- var labelSize = isStage ? stageClusterLabelSize : clusterLabelSize;
- drawClusterLabel(cluster, labelSize);
- });
-}
-
-/*
- * Helper function to draw a label for the given cluster element based on its name.
- *
- * In the process, we need to expand the bounding box to make room for the label.
- * We need to do this because dagre-d3 did not take this into account when it first
- * rendered the bounding boxes. Note that this means we need to adjust the view box
- * of the SVG afterwards since we shifted a few boxes around.
- */
-function drawClusterLabel(d3cluster, fontSize) {
- var cluster = d3cluster;
- var rect = d3cluster.select("rect");
- rect.attr("y", toFloat(rect.attr("y")) - fontSize);
- rect.attr("height", toFloat(rect.attr("height")) + fontSize);
- var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - fontSize / 2;
- var labelY = toFloat(rect.attr("y")) + fontSize * 1.5;
- var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
- cluster.append("text")
- .attr("x", labelX)
- .attr("y", labelY)
- .attr("text-anchor", "end")
- .style("font-size", fontSize + "px")
- .text(labelText);
}
/*
@@ -444,7 +430,7 @@ function addTooltipsForRDDs(svgContainer) {
if (tooltipText) {
node.select("circle")
.attr("data-toggle", "tooltip")
- .attr("data-placement", "right")
+ .attr("data-placement", "bottom")
.attr("title", tooltipText)
}
});
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index acbaba6791850..efb6b93cfc35d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -31,7 +31,7 @@ private[spark] object PythonUtils {
def sparkPythonPath: String = {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
- pythonPath += Seq(sparkHome, "python").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
@@ -53,4 +53,11 @@ private[spark] object PythonUtils {
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
}
+
+ /**
+ * Convert java map of K, V into Map of K, V (for calling API with varargs)
+ */
+ def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
+ jm.toMap
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index 3a2c94bd9d875..0a91977928cee 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.r
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
-import java.net.{InetSocketAddress, ServerSocket}
+import java.net.{InetAddress, InetSocketAddress, ServerSocket}
import java.util.concurrent.TimeUnit
import io.netty.bootstrap.ServerBootstrap
@@ -65,7 +65,7 @@ private[spark] class RBackend {
}
})
- channelFuture = bootstrap.bind(new InetSocketAddress(0))
+ channelFuture = bootstrap.bind(new InetSocketAddress("localhost", 0))
channelFuture.syncUninterruptibly()
channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort()
}
@@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging {
try {
// bind to random port
val boundPort = sparkRBackend.init()
- val serverSocket = new ServerSocket(0, 1)
+ val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()
// tell the R process via temporary file
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 6fea5e1144f2f..06247f7e8b78c 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.r
import java.io._
-import java.net.ServerSocket
+import java.net.{InetAddress, ServerSocket}
import java.util.{Map => JMap}
import scala.collection.JavaConversions._
@@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
val parentIterator = firstParent[T].iterator(partition, context)
// we expect two connections
- val serverSocket = new ServerSocket(0, 2)
+ val serverSocket = new ServerSocket(0, 2, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()
// The stdout/stderr is shared by multiple tasks, because we use one daemon
@@ -414,7 +414,7 @@ private[r] object RRDD {
synchronized {
if (daemonChannel == null) {
// we expect one connections
- val serverSocket = new ServerSocket(0, 1)
+ val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
// the socket used to send out the input of task
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 53e18c4bcec23..c2ed43a5397d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -18,9 +18,11 @@
package org.apache.spark.deploy
import java.net.URI
+import java.io.File
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.util.Try
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
@@ -81,16 +83,13 @@ object PythonRunner {
throw new IllegalArgumentException("Launching Python applications through " +
s"spark-submit is currently only supported for local files: $path")
}
- val windows = Utils.isWindows || testWindows
- var formattedPath = if (windows) Utils.formatWindowsPath(path) else path
-
- // Strip the URI scheme from the path
- formattedPath =
- new URI(formattedPath).getScheme match {
- case null => formattedPath
- case Utils.windowsDrive(d) if windows => formattedPath
- case _ => new URI(formattedPath).getPath
- }
+ // get path when scheme is file.
+ val uri = Try(new URI(path)).getOrElse(new File(path).toURI)
+ var formattedPath = uri.getScheme match {
+ case null => path
+ case "file" | "local" => uri.getPath
+ case _ => null
+ }
// Guard against malformed paths potentially throwing NPE
if (formattedPath == null) {
@@ -99,7 +98,9 @@ object PythonRunner {
// In Windows, the drive should not be prefixed with "/"
// For instance, python does not understand "/C:/path/to/sheep.py"
- formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
+ if (Utils.isWindows && formattedPath.matches("/[a-zA-Z]:/.*")) {
+ formattedPath = formattedPath.stripPrefix("/")
+ }
formattedPath
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index b563034457a91..7fa75ac8c2b54 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}
+import scala.collection.JavaConversions._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
-
-import scala.collection.JavaConversions._
-import scala.concurrent.duration._
-import scala.language.postfixOps
+import org.apache.spark.{Logging, SparkConf, SparkException}
/**
* :: DeveloperApi ::
@@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
- def recurse(path: Path): Array[FileStatus] = {
- val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
- leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
+ listLeafStatuses(fs, fs.getFileStatus(basePath))
+ }
+
+ /**
+ * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
+ * given path points to a file, return a single-element collection containing [[FileStatus]] of
+ * that file.
+ */
+ def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
+ def recurse(status: FileStatus): Seq[FileStatus] = {
+ val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
+ leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}
- val baseStatus = fs.getFileStatus(basePath)
- if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
+ if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
+ }
+
+ def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
+ listLeafDirStatuses(fs, fs.getFileStatus(basePath))
+ }
+
+ def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
+ def recurse(status: FileStatus): Seq[FileStatus] = {
+ val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
+ leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
+ }
+
+ assert(baseStatus.isDir)
+ recurse(baseStatus)
+ }
+
+ def globPath(pattern: Path): Seq[Path] = {
+ val fs = pattern.getFileSystem(conf)
+ Option(fs.globStatus(pattern)).map { statuses =>
+ statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
+ }.getOrElse(Seq.empty[Path])
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7dad30ecbdd2f..02a94baf372d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1523,13 +1523,15 @@ abstract class RDD[T: ClassTag](
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
- private[spark] def doCheckpoint() {
- if (!doCheckpointCalled) {
- doCheckpointCalled = true
- if (checkpointData.isDefined) {
- checkpointData.get.doCheckpoint()
- } else {
- dependencies.foreach(_.rdd.doCheckpoint())
+ private[spark] def doCheckpoint(): Unit = {
+ RDDOperationScope.withScope(sc, "checkpoint", false, true) {
+ if (!doCheckpointCalled) {
+ doCheckpointCalled = true
+ if (checkpointData.isDefined) {
+ checkpointData.get.doCheckpoint()
+ } else {
+ dependencies.foreach(_.rdd.doCheckpoint())
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 6a0f5c5d16daa..ad16becde85dd 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -194,11 +194,7 @@ private[spark] object UIUtils extends Logging {
{tab.name}
}
- val helpButton: Seq[Node] = helpText.map { helpText =>
-
- (?)
-
- }.getOrElse(Seq.empty)
+ val helpButton: Seq[Node] = helpText.map(tooltip(_, "bottom")).getOrElse(Seq.empty)
@@ -360,7 +356,7 @@ private[spark] object UIUtils extends Logging {
{
graphs.map { g =>
}
+ def tooltip(text: String, position: String): Seq[Node] = {
+
+ (?)
+
+ }
+
/** Return a script element that automatically expands the DAG visualization on page load. */
def expandDagVizOnLoad(forJob: Boolean): Seq[Node] = {