Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ private[spark] class ExecutorAllocationManager(
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

/**
* Change the value of numExecutorsTarget.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too vague...

Reset this manager to the initial starting state.
This must be called if the cluster manager is restarted.

*/
def reSetNumExecutorsTarget(): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 reset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just call this reset() since it needs to do more than just setting the target

logDebug(s"Now reset the value of numExecutorsTarget.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is off

numExecutorsTarget = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly what's going on, this will result in addExecutors to issue a request for the desired number of executors, if that does not match spark.dynamicAllocation.initialExecutors, right? That same method will also bring numExecutorsTarget up to whatever was the previous value once it runs.

Instead, wouldn't it be clearer to just explicitly send a message to the AM saying "this is the current state of the world, initialize yourself to deal with it"? Maybe as a reply to the RegisterClusterManager message.

Also, YarnAllocator holds a lot of state about existing containers, such as in allocatedHostToContainersMap and releasedContainers. Is that data re-created somehow when the new AM comes up? If not, what are the side-effects, if any, of not having that information around?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we should reset on the driver side as this patch has done, since we end up calling updateAndSync anyway

}

/**
* Stop the allocation manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private[spark] abstract class YarnSchedulerBackend(
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Some(am)
scheduler.sc.executorAllocationManager.foreach(_.reSetNumExecutorsTarget())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we also need to clear things in CoarseGrainedSchedulerBackend


case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
Expand Down