Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,16 +577,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}

if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
val executorCores = get(EXECUTOR_CORES)
val taskCpus = get(CPUS_PER_TASK)

if (executorCores < taskCpus) {
throw new SparkException(
s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
}
}

val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2665,8 +2665,25 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

// SPARK-26340: Ensure that executor's core num meets at least one task requirement.
def checkCpusPerTask(
executorCoreNum: Int = sc.conf.get(EXECUTOR_CORES),
clusterMode: Boolean = true): Unit = {
val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
if (executorCoreNum < cpusPerTask) {
val message = if (clusterMode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need two messages here. Just have one stating that ${CPUS_PER_TASK.key} must be <= the $executorCoreNum cores available per executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorCoreNum is decided by local[N] or --conf spark.executor.cores=M, I think we should info this in exception.
And, if user both sets --master local[N] and --conf spark.executor.cores=M, I think we should ignore the latter.

s"${CPUS_PER_TASK.key} must be <= ${EXECUTOR_CORES.key} when run on $master."
} else {
s"Only $executorCoreNum cores available per executor when run on $master," +
s" and ${CPUS_PER_TASK.key} must be <= it."
}
throw new SparkException(message)
}
}

master match {
case "local" =>
checkCpusPerTask(1, clusterMode = false)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
Expand All @@ -2679,6 +2696,7 @@ object SparkContext extends Logging {
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkCpusPerTask(threadCount, clusterMode = false)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
Expand All @@ -2689,19 +2707,22 @@ object SparkContext extends Logging {
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
checkCpusPerTask(threadCount, clusterMode = false)
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
checkCpusPerTask()
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkCpusPerTask()
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
Expand All @@ -2722,6 +2743,7 @@ object SparkContext extends Logging {
(backend, scheduler)

case masterUrl =>
checkCpusPerTask()
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
Expand Down
7 changes: 0 additions & 7 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(sc.appName === "My other app")
}

test("creating SparkContext with cpus per tasks bigger than cores per executors") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to at least add back this test, for SparkContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liutang123 let's address this or close it; some of this is already checked

Copy link
Contributor Author

@liutang123 liutang123 Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @srowen I add a UT in SparkContextSuite witch is more reasonable than this I think.
If you think #23290 is necessary to be retained, this PR only requires checking the local mode.
But, imagine this case:
spark.executor.cores 1 in conf/spark-defaults.conf as default conf.
If user exec spark-shell --master local[6] --conf spark.task.cpus=2 command, the checking logic in #23290 will throw an exception. This logic forces the user to set spark.executor.cores larger than 2 although the spark.executor.cores is meaningless. So, I think we can check spark.task.cpus before creating SchedulerBackend and TaskScheduler - but what do you think?

val conf = new SparkConf(false)
.set(EXECUTOR_CORES, 1)
.set(CPUS_PER_TASK, 2)
intercept[SparkException] { sc = new SparkContext(conf) }
}

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
System.setProperty("spark.test.a", "a")
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(runningTaskIds.isEmpty)
}
}

test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") {
val FAIL_REASON = s"${CPUS_PER_TASK.key} must be <="
Seq(
("local", 2, None),
("local[2]", 3, None),
("local[2, 1]", 3, None),
("spark://test-spark-cluster", 2, Option(1)),
("local-cluster[1, 1, 1000]", 2, Option(1)),
("yarn", 2, Option(1))
).foreach { case (master, cpusPerTask, executorCores) =>
val conf = new SparkConf()
conf.set(CPUS_PER_TASK, cpusPerTask)
executorCores.map(executorCores => conf.set(EXECUTOR_CORES, executorCores))
val ex = intercept[SparkException] {
sc = new SparkContext(master, "test", conf)
}
assert(ex.getMessage.contains(FAIL_REASON))
resetSparkContext()
}
}
}

object SparkContextSuite {
Expand Down