Skip to content

Commit 581f7bf

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rest
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
2 parents 9e0d1af + 62a93a1 commit 581f7bf

File tree

55 files changed

+1490
-349
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1490
-349
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ To build Spark and its example programs, run:
2626

2727
(You do not need to do this if you downloaded a pre-built package.)
2828
More detailed documentation is available from the project site, at
29-
["Building Spark with Maven"](http://spark.apache.org/docs/latest/building-spark.html).
29+
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).
3030

3131
## Interactive Scala Shell
3232

bin/compute-classpath.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ fi
5050
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
5151
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
5252
"classes ahead of assembly." >&2
53+
# Spark classes
5354
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes"
54-
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
5555
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes"
5656
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes"
5757
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes"
@@ -63,6 +63,8 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
6363
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes"
6464
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes"
6565
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes"
66+
# Jars for shaded deps in their original form (copied here during build)
67+
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
6668
fi
6769

6870
# Use spark-assembly jar from either RELEASE or assembly directory

core/pom.xml

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,35 @@
9494
<groupId>org.apache.curator</groupId>
9595
<artifactId>curator-recipes</artifactId>
9696
</dependency>
97+
98+
<!-- Jetty dependencies promoted to compile here so they are shaded
99+
and inlined into spark-core jar -->
97100
<dependency>
98101
<groupId>org.eclipse.jetty</groupId>
99102
<artifactId>jetty-plus</artifactId>
103+
<scope>compile</scope>
100104
</dependency>
101105
<dependency>
102106
<groupId>org.eclipse.jetty</groupId>
103107
<artifactId>jetty-security</artifactId>
108+
<scope>compile</scope>
104109
</dependency>
105110
<dependency>
106111
<groupId>org.eclipse.jetty</groupId>
107112
<artifactId>jetty-util</artifactId>
113+
<scope>compile</scope>
108114
</dependency>
109115
<dependency>
110116
<groupId>org.eclipse.jetty</groupId>
111117
<artifactId>jetty-server</artifactId>
118+
<scope>compile</scope>
112119
</dependency>
120+
<dependency>
121+
<groupId>org.eclipse.jetty</groupId>
122+
<artifactId>jetty-http</artifactId>
123+
<scope>compile</scope>
124+
</dependency>
125+
113126
<dependency>
114127
<groupId>org.apache.commons</groupId>
115128
<artifactId>commons-lang3</artifactId>
@@ -356,19 +369,24 @@
356369
<groupId>org.apache.maven.plugins</groupId>
357370
<artifactId>maven-dependency-plugin</artifactId>
358371
<executions>
372+
<!-- When using SPARK_PREPEND_CLASSES Spark classes compiled locally don't use
373+
shaded deps. So here we store jars in their original form which are added
374+
when the classpath is computed. -->
359375
<execution>
360376
<id>copy-dependencies</id>
361377
<phase>package</phase>
362378
<goals>
363379
<goal>copy-dependencies</goal>
364380
</goals>
365-
<configuration>
381+
<configuration>
366382
<outputDirectory>${project.build.directory}</outputDirectory>
367383
<overWriteReleases>false</overWriteReleases>
368384
<overWriteSnapshots>false</overWriteSnapshots>
369385
<overWriteIfNewer>true</overWriteIfNewer>
370386
<useSubDirectoryPerType>true</useSubDirectoryPerType>
371-
<includeArtifactIds>guava</includeArtifactIds>
387+
<includeArtifactIds>
388+
guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server
389+
</includeArtifactIds>
372390
<silent>true</silent>
373391
</configuration>
374392
</execution>

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
4949
* spark.dynamicAllocation.enabled - Whether this feature is enabled
5050
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
5151
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
52+
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
5253
*
5354
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
5455
* If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
7071

7172
import ExecutorAllocationManager._
7273

73-
// Lower and upper bounds on the number of executors. These are required.
74-
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
75-
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
74+
// Lower and upper bounds on the number of executors.
75+
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
76+
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
77+
Integer.MAX_VALUE)
7678

7779
// How long there must be backlogged tasks for before an addition is triggered
7880
private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
132134
*/
133135
private def validateSettings(): Unit = {
134136
if (minNumExecutors < 0 || maxNumExecutors < 0) {
135-
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
137+
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
136138
}
137-
if (minNumExecutors == 0 || maxNumExecutors == 0) {
138-
throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
139+
if (maxNumExecutors == 0) {
140+
throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
139141
}
140142
if (minNumExecutors > maxNumExecutors) {
141143
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
2626
import org.apache.spark.util.{RedirectThread, Utils}
2727

2828
/**
29-
* A main class used by spark-submit to launch Python applications. It executes python as a
29+
* A main class used to launch Python applications. It executes python as a
3030
* subprocess and then has it connect back to the JVM to access system properties, etc.
3131
*/
3232
object PythonRunner {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class SparkHadoopUtil extends Logging {
141141
val baselineBytesRead = f()
142142
Some(() => f() - baselineBytesRead)
143143
} catch {
144-
case e: NoSuchMethodException => {
144+
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
145145
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
146146
None
147147
}
@@ -163,7 +163,7 @@ class SparkHadoopUtil extends Logging {
163163
val baselineBytesWritten = f()
164164
Some(() => f() - baselineBytesWritten)
165165
} catch {
166-
case e: NoSuchMethodException => {
166+
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
167167
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
168168
None
169169
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import java.net.URL
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2525

26+
import org.apache.hadoop.fs.Path
27+
2628
import org.apache.spark.deploy.rest._
2729
import org.apache.spark.executor.ExecutorURLClassLoader
2830
import org.apache.spark.util.Utils
@@ -203,21 +205,38 @@ object SparkSubmit {
203205
}
204206
}
205207

208+
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
209+
210+
// Require all python files to be local, so we can add them to the PYTHONPATH
211+
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
212+
if (args.isPython && !isYarnCluster) {
213+
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
214+
printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
215+
}
216+
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
217+
if (nonLocalPyFiles.nonEmpty) {
218+
printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
219+
}
220+
}
221+
206222
// The following modes are not supported or applicable
207223
(clusterManager, deployMode) match {
208224
case (MESOS, CLUSTER) =>
209225
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
210-
case (_, CLUSTER) if args.isPython =>
211-
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
226+
case (STANDALONE, CLUSTER) if args.isPython =>
227+
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
228+
"applications on standalone clusters.")
212229
case (_, CLUSTER) if isShell(args.primaryResource) =>
213230
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
214231
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
215232
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
233+
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
234+
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
216235
case _ =>
217236
}
218237

219238
// If we're running a python app, set the main class to our specific python runner
220-
if (args.isPython) {
239+
if (args.isPython && deployMode == CLIENT) {
221240
if (args.primaryResource == PYSPARK_SHELL) {
222241
args.mainClass = "py4j.GatewayServer"
223242
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -234,6 +253,13 @@ object SparkSubmit {
234253
}
235254
}
236255

256+
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
257+
// that can be distributed with the job
258+
if (args.isPython && isYarnCluster) {
259+
args.files = mergeFileLists(args.files, args.primaryResource)
260+
args.files = mergeFileLists(args.files, args.pyFiles)
261+
}
262+
237263
// Special flag to avoid deprecation warnings at the client
238264
sysProps("SPARK_SUBMIT") = "true"
239265

@@ -311,7 +337,6 @@ object SparkSubmit {
311337
// Add the application jar automatically so the user doesn't have to call sc.addJar
312338
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
313339
// For python files, the primary resource is already distributed as a regular file
314-
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
315340
if (!isYarnCluster && !args.isPython) {
316341
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
317342
if (isUserJar(args.primaryResource)) {
@@ -337,10 +362,22 @@ object SparkSubmit {
337362
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
338363
if (isYarnCluster) {
339364
childMainClass = "org.apache.spark.deploy.yarn.Client"
340-
if (args.primaryResource != SPARK_INTERNAL) {
341-
childArgs += ("--jar", args.primaryResource)
365+
if (args.isPython) {
366+
val mainPyFile = new Path(args.primaryResource).getName
367+
childArgs += ("--primary-py-file", mainPyFile)
368+
if (args.pyFiles != null) {
369+
// These files will be distributed to each machine's working directory, so strip the
370+
// path prefix
371+
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
372+
childArgs += ("--py-files", pyFilesNames)
373+
}
374+
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
375+
} else {
376+
if (args.primaryResource != SPARK_INTERNAL) {
377+
childArgs += ("--jar", args.primaryResource)
378+
}
379+
childArgs += ("--class", args.mainClass)
342380
}
343-
childArgs += ("--class", args.mainClass)
344381
if (args.childArgs != null) {
345382
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
346383
}
@@ -504,6 +541,13 @@ object SparkSubmit {
504541
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
505542
}
506543

544+
/**
545+
* Return whether the given main class represents a thrift server.
546+
*/
547+
private[spark] def isThriftServer(mainClass: String): Boolean = {
548+
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
549+
}
550+
507551
/**
508552
* Return whether the given primary resource requires running python.
509553
*/

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -207,18 +207,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
207207
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
208208
}
209209

210-
// Require all python files to be local, so we can add them to the PYTHONPATH
211-
if (isPython) {
212-
if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
213-
SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
214-
}
215-
val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
216-
if (nonLocalPyFiles.nonEmpty) {
217-
SparkSubmit.printErrorAndExit(
218-
s"Only local additional python files are supported: $nonLocalPyFiles")
219-
}
220-
}
221-
222210
if (master.startsWith("yarn")) {
223211
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
224212
if (!hasHadoopEnv && !Utils.isTesting) {

0 commit comments

Comments
 (0)