Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Feb 6, 2017

What changes were proposed in this pull request?

Dynamic set spark.dynamicAllocation.maxExecutors by cluster resources.

How was this patch tested?

manual test and unit test

@srowen
Copy link
Member

srowen commented Feb 6, 2017

I don't think this is a necessary change. Already, you can't ask for more resources than the cluster has; the cluster won't grant them. Capping it here means the app can't use more resources if the cluster suddenly gets more.

I see the problem you're trying to solve but the resource manager already ramps up requests slowly, so I don't think this is the issue.

@SparkQA
Copy link

SparkQA commented Feb 6, 2017

Test build #72434 has finished for PR 16819 at commit 97e5eee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

I agree. Resource managers generally expect applications to request more than what's available already so we don't have to do it again ourselves in Spark.

@wangyum
Copy link
Member Author

wangyum commented Feb 7, 2017

It will reduce the function call on CoarseGrainedSchedulerBackend.requestTotalExecutors() after apply this PR:
before after-apply-this-PR

Full log can be found here.

@srowen
Copy link
Member

srowen commented Feb 7, 2017

What problem does this solve though? calling that function is not a problem. It seems like you get the right behavior in both cases. Are you saying there's some RPC problem? The target goes very high, but, as far as I can see it's correctly reflecting the fact that the app would use a lot of executors if it could -- that's fine.

@wangyum
Copy link
Member Author

wangyum commented Feb 20, 2017

@srowen . Dynamic set spark.dynamicAllocation.maxExecutors can avoid some strange problems:

  1. Spark application hang when dynamic allocation is enabled
  2. Report failure reason from Reporter Thread
  3. CLI shows successful but web ui didn't, simally to this

I add a unit test just now.

@SparkQA
Copy link

SparkQA commented Feb 20, 2017

Test build #73147 has finished for PR 16819 at commit 4f81680.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 20, 2017

Test build #73151 has finished for PR 16819 at commit 8e99701.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get
if (defaultMaxNumExecutors == sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) {
val executorCores = sparkConf.getInt("spark.executor.cores", 1)
val maxNumExecutors = yarnClient.getNodeReports().asScala.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we take queue's maxResources amount into account from ResourceManager REST APIs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I will try API first. Pseudo code:

import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import scala.collection.JavaConverters._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration

val yarnConf = new YarnConfiguration()
val yarnClient = YarnClient.createYarnClient
yarnClient.init(yarnConf)
yarnClient.start()
yarnClient.getRootQueueInfos

@SparkQA
Copy link

SparkQA commented Feb 22, 2017

Test build #73277 has finished for PR 16819 at commit fabe2c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2017

Test build #73282 has finished for PR 16819 at commit cd306e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Feb 22, 2017

I agree there's room for improvement in the current code; I even asked SPARK-18769 to be filed to track that work.

But I don't think setting the max to a fixed value at startup is the right approach. Queue configs change, node managers go up and down, new ones are added, old ones are removed. If this value ends up being calculated at the wrong time, the application will suffer. If you want to investigate a more dynamic approach here I'm all for that, but I'm not a big fan of the current solution.

@wangyum
Copy link
Member Author

wangyum commented Feb 23, 2017

@vanzin We must pull the configuration from ResourceManager, ResourceManager can't push.
So setting the max before each stage? This feels too frequent.

In fact, This is suitable for periodic tasks. e.g. ML, SQL,
For streaming jobs, it is better set manually.

@vanzin
Copy link
Contributor

vanzin commented Feb 23, 2017

Getting the config only at the beginning, to me, is not an acceptable solution.

Getting it every once in a while is better, but it's not the only possible approach. I even suggest something different in the bug I mention above.

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73515 has finished for PR 16819 at commit e4b3b0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

I agree with others, this is not the way to do this. There are different schedulers in yarn, each with different configs that could affect the actual resources you get.

If you want to do something like this it should look at the available resources after calling the allocate call to yarn (allocateResponse.getAvailableResources). When yarn returns it tells you the available resources, which takes into account the various scheduler things.

MapReduce refers to that as headroom and uses it to determine things like if it needs to kill a reducer to run a map. We could use this to help with dynamic allocation and do more intelligent things.

@wangyum
Copy link
Member Author

wangyum commented Feb 28, 2017

@vanzin What do you think about current approach? I have tested on a same Spark hive-thriftserver, the spark.dynamicAllocation.maxExecutors wiil decrease if I kill 4 NodeManager:

17/02/27 15:58:08 DEBUG ExecutorAllocationManager: Not adding executors because our current target total is already 94 (limit 94)
17/02/27 15:58:09 DEBUG ExecutorAllocationManager: Not adding executors because our current target total is already 94 (limit 94)
17/02/27 16:05:49 DEBUG ExecutorAllocationManager: Not adding executors because our current target total is already 85 (limit 85)
17/02/27 16:05:49 DEBUG ExecutorAllocationManager: Not adding executors because our current target total is already 85 (limit 85)

@vanzin
Copy link
Contributor

vanzin commented Mar 2, 2017

So your current approach is to have a second connection to the RM, and ask for the RM's available resources every time the scheduler tries to change the number of resources.

Did you look at Tom's suggestion of using {{AllocateResponse.getAvailableResources()}} instead? Seems like it would be simpler, cheaper, and could all be handled internally in {{YarnAllocator.scala}}.

srowen added a commit to srowen/spark that referenced this pull request Mar 22, 2017
@srowen srowen mentioned this pull request Mar 22, 2017
@asfgit asfgit closed this in b70c03a Mar 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants