Skip to content

Conversation

@liutang123
Copy link
Contributor

@liutang123 liutang123 commented Mar 18, 2019

What changes were proposed in this pull request?

spark.task.cpus should be less or equal than spark.executor.cores when use static executor allocation

How was this patch tested?

manual

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Why not?)

Copy link
Contributor Author

@liutang123 liutang123 Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when run local mode, just 1 core is available.

scala>sc.setLogLevel("INFO")
scala>sc.parallelize(1 to 9).collect

You can see spark will hang after log INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, local == local[1]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below I don't think "local[...]" adds much beyond threadCount in the message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pardon me as my English isn't very good.
I do not understand this comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already report threadCount in the message; local[threads] doesn't add information. It can be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes threadCount is not the same to threads such as local[*]

@liutang123 liutang123 changed the title [SPARK-27192][Core] spark.task.cpus should be less or equal than spar… [SPARK-27192][Core] spark.task.cpus should be less or equal than spark.executor.cores Mar 20, 2019
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just modify this little utility method to take a "cores" parameter and then use it in all the cases below? it can default to sc.conf.get(EXECUTOR_CORES), and then below you can set it to 1 for the local case, for example.

@jiangxb1987
Copy link
Contributor

Have you noticed

if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
val executorCores = get(EXECUTOR_CORES)
val taskCpus = get(CPUS_PER_TASK)
if (executorCores < taskCpus) {
throw new SparkException(
s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
}
}
? Why do we still want to check in SparkContext?

@liutang123
Copy link
Contributor Author

liutang123 commented Mar 21, 2019

@jiangxb1987 Thanks for review.
Sorry I didn't noticed the checking logic in SparkConf, but I think the checking logic is incomplete for local mode.
For example:
case 1:

$SPARK_HOME/bin/spark-shell --master local[3] --conf spark.task.cpus=2 --conf spark.executor.cores=1

local[3] decides executor's core num is 3, but in #23290's logic, exception will be thrown.
case 2:

$SPARK_HOME/bin/spark-shell  --master local  --conf spark.task.cpus=2 --conf spark.executor.cores=3
scala>sc.setLogLevel("INFO")
scala>sc.parallelize(1 to 9).collect

You can see spark will hang after log INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks. but the checking logic in #23290 can not identify this case.
So, I think we can check the spark.task.cpus before creating TaskScheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need two messages here. Just have one stating that ${CPUS_PER_TASK.key} must be <= the $executorCoreNum cores available per executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorCoreNum is decided by local[N] or --conf spark.executor.cores=M, I think we should info this in exception.
And, if user both sets --master local[N] and --conf spark.executor.cores=M, I think we should ignore the latter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to at least add back this test, for SparkContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liutang123 let's address this or close it; some of this is already checked

Copy link
Contributor Author

@liutang123 liutang123 Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @srowen I add a UT in SparkContextSuite witch is more reasonable than this I think.
If you think #23290 is necessary to be retained, this PR only requires checking the local mode.
But, imagine this case:
spark.executor.cores 1 in conf/spark-defaults.conf as default conf.
If user exec spark-shell --master local[6] --conf spark.task.cpus=2 command, the checking logic in #23290 will throw an exception. This logic forces the user to set spark.executor.cores larger than 2 although the spark.executor.cores is meaningless. So, I think we can check spark.task.cpus before creating SchedulerBackend and TaskScheduler - but what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's flip this around and have a test method that iterates over the possibilities

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break the body onto a new line. Also, can you assert that the message contains a short substring that indicates the error is the expected one?

@srowen
Copy link
Member

srowen commented Mar 30, 2019

Merged to master

@srowen srowen closed this in f8fa564 Mar 30, 2019
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 30, 2019

Hi, All.
This doesn't pass the Jenkins. And, currently this breaks the master branch due to UT failures. I'll revert this.

@srowen
Copy link
Member

srowen commented Mar 30, 2019

Ugh another error. I think I looked at the wrong PR when checking whether it passed. I'll revert as needed

@dongjoon-hyun
Copy link
Member

Oh, I reverted it already~

@dongjoon-hyun
Copy link
Member

@liutang123 . Could you check the following tests and make another PR please?

  • org.apache.spark.BarrierStageOnSubmittedSuite.submit a barrier ResultStage that requires more slots than current total under local-cluster mode
  • org.apache.spark.BarrierStageOnSubmittedSuite.submit a barrier ShuffleMapStage that requires more slots than current total under local-cluster mode
  • org.apache.spark.scheduler.CoarseGrainedSchedulerBackendSuite.compute max number of concurrent tasks can be launched when spark.task.cpus > 1
  • org.apache.spark.scheduler.CoarseGrainedSchedulerBackendSuite.compute max number of concurrent tasks can be launched when some executors are busy
  • org.apache.spark.scheduler.TaskSchedulerImplSuite.Scheduler correctly accounts for multiple CPUs per task
  • org.apache.spark.scheduler.TaskSchedulerImplSuite.Scheduler does not crash when tasks are not serializable
  • org.apache.spark.scheduler.TaskSchedulerImplSuite.don't schedule for a barrier taskSet if available slots are less than pending tasks
  • org.apache.spark.scheduler.TaskSchedulerImplSuite.schedule tasks for a barrier taskSet if all tasks can be launched together

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants