Skip to content

Commit 5e30fc3

Browse files
committed
Merge pull request #97 from markhamstra/csd-1.5
Duplicate artifact testing
2 parents 5891760 + b9ba9e4 commit 5e30fc3

File tree

5 files changed

+28
-42
lines changed

5 files changed

+28
-42
lines changed

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,13 +1983,12 @@
19831983
<artifactId>maven-source-plugin</artifactId>
19841984
<version>2.4</version>
19851985
<configuration>
1986-
<attach>false</attach>
1986+
<attach>true</attach>
19871987
</configuration>
19881988
<executions>
19891989
<execution>
19901990
<id>attach-sources</id>
19911991
<goals>
1992-
<goal>jar-no-fork</goal>
19931992
<goal>test-jar-no-fork</goal>
19941993
</goals>
19951994
</execution>

yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
8181
.orNull
8282
// If dynamic allocation is enabled, start at the configured initial number of executors.
8383
// Default to minExecutors if no initialExecutors is set.
84-
if (isDynamicAllocationEnabled) {
85-
val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
86-
val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
87-
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
88-
val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
89-
val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
90-
val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
91-
92-
// If defined, initial executors must be between min and max
93-
if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
94-
throw new IllegalArgumentException(
95-
s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
96-
}
97-
98-
numExecutors = initialNumExecutors
99-
} else {
100-
val numExecutorsConf = "spark.executor.instances"
101-
numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
102-
}
84+
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
10385
principal = Option(principal)
10486
.orElse(sparkConf.getOption("spark.yarn.principal"))
10587
.orNull

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
8989
@volatile private var numExecutorsFailed = 0
9090

9191
@volatile private var targetNumExecutors =
92-
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
93-
sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
94-
} else {
95-
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
96-
}
92+
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
9793

9894
// Keep track of which container is running which executor to remove the executors later
9995
// Visible for testing.

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,5 +278,28 @@ object YarnSparkHadoopUtil {
278278
def getClassPathSeparator(): String = {
279279
classPathSeparatorField.get(null).asInstanceOf[String]
280280
}
281+
282+
/**
283+
* Getting the initial target number of executors depends on whether dynamic allocation is
284+
* enabled.
285+
*/
286+
def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
287+
if (Utils.isDynamicAllocationEnabled(conf)) {
288+
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
289+
val initialNumExecutors =
290+
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
291+
val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
292+
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
293+
s"initial executor number $initialNumExecutors must between min executor number" +
294+
s"$minNumExecutors and max executor number $maxNumExecutors")
295+
296+
initialNumExecutors
297+
} else {
298+
val targetNumExecutors =
299+
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
300+
// System property can override environment variable.
301+
conf.getInt("spark.executor.instances", targetNumExecutors)
302+
}
303+
}
281304
}
282305

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,13 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import java.net.NetworkInterface
21-
2220
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
23-
24-
import scala.collection.JavaConverters._
25-
26-
import org.apache.hadoop.yarn.api.records.NodeState
27-
import org.apache.hadoop.yarn.client.api.YarnClient
2821
import org.apache.hadoop.yarn.conf.YarnConfiguration
2922

3023
import org.apache.spark.SparkContext
3124
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
32-
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
3325
import org.apache.spark.scheduler.TaskSchedulerImpl
34-
import org.apache.spark.util.{IntParam, Utils}
26+
import org.apache.spark.util.Utils
3527

3628
private[spark] class YarnClusterSchedulerBackend(
3729
scheduler: TaskSchedulerImpl,
@@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(
4032

4133
override def start() {
4234
super.start()
43-
totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
44-
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
45-
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
46-
.getOrElse(totalExpectedExecutors)
47-
}
48-
// System property can override environment variable.
49-
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
35+
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
5036
}
5137

5238
override def applicationId(): String =

0 commit comments

Comments
 (0)