Skip to content

Commit 4ae834b

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into https
2 parents 3b01d3a + 90f73fc commit 4ae834b

File tree

159 files changed

+3932
-2091
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

159 files changed

+3932
-2091
lines changed

bin/compute-classpath.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3838
rem Build up classpath
3939
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
4040

41-
if "x%SPARK_CONF_DIR%"!="x" (
41+
if not "x%SPARK_CONF_DIR%"=="x" (
4242
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
4343
) else (
4444
set CLASSPATH=%CLASSPATH%;%FWDIR%conf

bin/pyspark

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,47 @@ fi
5050

5151
. "$FWDIR"/bin/load-spark-env.sh
5252

53-
# Figure out which Python executable to use
53+
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
54+
# executable, while the worker would still be launched using PYSPARK_PYTHON.
55+
#
56+
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
57+
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
58+
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
59+
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
60+
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
61+
# and executor Python executables.
62+
#
63+
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.
64+
65+
# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
66+
if hash python2.7 2>/dev/null; then
67+
# Attempt to use Python 2.7, if installed:
68+
DEFAULT_PYTHON="python2.7"
69+
else
70+
DEFAULT_PYTHON="python"
71+
fi
72+
73+
# Determine the Python executable to use for the driver:
74+
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
75+
# If IPython options are specified, assume user wants to run IPython
76+
# (for backwards-compatibility)
77+
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
78+
PYSPARK_DRIVER_PYTHON="ipython"
79+
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
80+
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
81+
fi
82+
83+
# Determine the Python executable to use for the executors:
5484
if [[ -z "$PYSPARK_PYTHON" ]]; then
55-
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
56-
# for backward compatibility
57-
PYSPARK_PYTHON="ipython"
85+
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
86+
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
87+
exit 1
5888
else
59-
PYSPARK_PYTHON="python"
89+
PYSPARK_PYTHON="$DEFAULT_PYTHON"
6090
fi
6191
fi
6292
export PYSPARK_PYTHON
6393

64-
if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
65-
# for backward compatibility
66-
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
67-
fi
68-
6994
# Add the PySpark classes to the Python path:
7095
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
7196
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
@@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
93118
unset YARN_CONF_DIR
94119
unset HADOOP_CONF_DIR
95120
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
96-
exec "$PYSPARK_PYTHON" -m doctest $1
121+
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
97122
else
98-
exec "$PYSPARK_PYTHON" $1
123+
exec "$PYSPARK_DRIVER_PYTHON" $1
99124
fi
100125
exit
101126
fi
@@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
111136
else
112137
# PySpark shell requires special handling downstream
113138
export PYSPARK_SHELL=1
114-
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
139+
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
115140
fi

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ table.sortable thead {
5151
cursor: pointer;
5252
}
5353

54+
table.sortable td {
55+
word-wrap: break-word;
56+
max-width: 600px;
57+
}
58+
5459
.progress {
5560
margin-bottom: 0px; position: relative
5661
}

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
168168
arr.iterator.asInstanceOf[Iterator[T]]
169169
case Right(it) =>
170170
// There is not enough space to cache this partition in memory
171-
logWarning(s"Not enough space to cache partition $key in memory! " +
172-
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
173171
val returnValues = it.asInstanceOf[Iterator[T]]
174172
if (putLevel.useDisk) {
175173
logWarning(s"Persisting partition $key to disk instead.")

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
4343
* :: DeveloperApi ::
4444
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
4545
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
46-
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
47-
* objects needs to have the right SparkEnv set. You can get the current environment with
48-
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
46+
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
47+
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
4948
*
5049
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
5150
* in a future release.
@@ -119,30 +118,28 @@ class SparkEnv (
119118
}
120119

121120
object SparkEnv extends Logging {
122-
private val env = new ThreadLocal[SparkEnv]
123-
@volatile private var lastSetSparkEnv : SparkEnv = _
121+
@volatile private var env: SparkEnv = _
124122

125123
private[spark] val driverActorSystemName = "sparkDriver"
126124
private[spark] val executorActorSystemName = "sparkExecutor"
127125

128126
def set(e: SparkEnv) {
129-
lastSetSparkEnv = e
130-
env.set(e)
127+
env = e
131128
}
132129

133130
/**
134-
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
135-
* previously set in any thread.
131+
* Returns the SparkEnv.
136132
*/
137133
def get: SparkEnv = {
138-
Option(env.get()).getOrElse(lastSetSparkEnv)
134+
env
139135
}
140136

141137
/**
142138
* Returns the ThreadLocal SparkEnv.
143139
*/
140+
@deprecated("Use SparkEnv.get instead", "1.2")
144141
def getThreadLocal: SparkEnv = {
145-
env.get()
142+
env
146143
}
147144

148145
private[spark] def create(

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
2626
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
2727
import com.google.common.io.Files
2828

29+
import org.apache.spark.util.Utils
30+
2931
/**
3032
* Utilities for tests. Included in main codebase since it's used by multiple
3133
* projects.
@@ -42,8 +44,7 @@ private[spark] object TestUtils {
4244
* in order to avoid interference between tests.
4345
*/
4446
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
45-
val tempDir = Files.createTempDir()
46-
tempDir.deleteOnExit()
47+
val tempDir = Utils.createTempDir()
4748
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
4849
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
4950
createJar(files, jarFile)

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ private[spark] class PythonRDD(
196196

197197
override def run(): Unit = Utils.logUncaughtExceptions {
198198
try {
199-
SparkEnv.set(env)
200199
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
201200
val dataOut = new DataOutputStream(stream)
202201
// Partition index
@@ -248,6 +247,11 @@ private[spark] class PythonRDD(
248247
// will kill the whole executor (see org.apache.spark.executor.Executor).
249248
_exception = e
250249
worker.shutdownOutput()
250+
} finally {
251+
// Release memory used by this thread for shuffles
252+
env.shuffleMemoryManager.releaseMemoryForThisThread()
253+
// Release memory used by this thread for unrolling blocks
254+
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
251255
}
252256
}
253257
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
108108
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
109109

110110
// Create and start the worker
111-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
111+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
112112
val workerEnv = pb.environment()
113113
workerEnv.putAll(envVars)
114114
workerEnv.put("PYTHONPATH", pythonPath)
115+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
116+
workerEnv.put("PYTHONUNBUFFERED", "YES")
115117
val worker = pb.start()
116118

117119
// Redirect worker stdout and stderr
@@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
149151

150152
try {
151153
// Create and start the daemon
152-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
153155
val workerEnv = pb.environment()
154156
workerEnv.putAll(envVars)
155157
workerEnv.put("PYTHONPATH", pythonPath)
158+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
159+
workerEnv.put("PYTHONUNBUFFERED", "YES")
156160
daemon = pb.start()
157161

158162
val in = new DataInputStream(daemon.getInputStream)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
130130
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
131131
System.exit(-1)
132132

133-
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
133+
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
134134
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
135135
println(s"Cause was: $cause")
136136
System.exit(-1)

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ object PythonRunner {
3434
val pythonFile = args(0)
3535
val pyFiles = args(1)
3636
val otherArgs = args.slice(2, args.length)
37-
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
37+
val pythonExec =
38+
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
3839

3940
// Format python file paths before adding them to the PYTHONPATH
4041
val formattedPythonFile = formatPath(pythonFile)
@@ -57,6 +58,7 @@ object PythonRunner {
5758
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
5859
val env = builder.environment()
5960
env.put("PYTHONPATH", pythonPath)
61+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
6062
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
6163
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
6264
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize

0 commit comments

Comments
 (0)