Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
68fc364
[SPARK-32919][CORE] Driver side changes for coordinating push based s…
venkata91 Aug 6, 2020
2688df2
Empty commit to add Min Shen to authors list
Victsm Oct 27, 2020
0423970
Addressed some of the comments both from Mridul and Thomas
venkata91 Nov 2, 2020
eb93fe1
Address style check issues
venkata91 Nov 3, 2020
f2f61e2
Address style checks and Mridul's comment
venkata91 Nov 3, 2020
3a6219f
Address Tom's comment
venkata91 Nov 3, 2020
ca44d03
Prefer active executors for merger locations
venkata91 Nov 5, 2020
2172f65
Addressed comments from ngone51 and otterc
venkata91 Nov 5, 2020
1e76824
Address ngone51 review comments
venkata91 Nov 6, 2020
3e03109
Address Mridul's review comments
venkata91 Nov 9, 2020
cbb66eb
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 9, 2020
a1c8831
Addressed review comments
venkata91 Nov 10, 2020
047ad0c
Address review comments
venkata91 Nov 11, 2020
2d6d266
Address ngone51 review comments
venkata91 Nov 12, 2020
2b0c073
pick hosts in random order and remove host from shuffle push merger if a
venkata91 Nov 12, 2020
9ba4dfb
Addressed ngone51 comments
venkata91 Nov 13, 2020
5127d8b
Address review comments
venkata91 Nov 15, 2020
e320ac0
Address attila comments
venkata91 Nov 17, 2020
a2d85ef
Addressed ngone51 review comment
venkata91 Nov 17, 2020
affa8a0
Address attilapiros review comments
venkata91 Nov 17, 2020
46f5670
Add test in UtilsSuite to check push based shuffle is enabled or not
venkata91 Nov 18, 2020
2467a61
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
050a5ae
Address Dongjoon comments
venkata91 Nov 18, 2020
1714829
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
1ba7668
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 19, 2020
5ce2934
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
if (mergerLocs != null) {
this.mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
47 changes: 47 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1945,4 +1945,51 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
"which needs to be set with the appropriate " +
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
"shuffle to be enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
.doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
"Currently, shuffle push merger locations are nothing but external shuffle services " +
"which are responsible for handling pushed blocks and merging them and serving " +
"merged blocks for later shuffle fetch.")
.version("3.1.0")
.intConf
.createWithDefault(500)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("The minimum number of shuffle merger locations required to enable push based " +
"shuffle for a stage. This is specified as a ratio of the number of partitions in " +
"the child stage. For example, a reduce stage which has 100 partitions and uses the " +
"default value 0.05 requires at least 5 unique merger locations to enable push based " +
"shuffle. Merger locations are currently defined as external shuffle services.")
.version("3.1.0")
.doubleConf
.createWithDefault(0.05)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc(s"The static threshold for number of shuffle push merger locations should be " +
"available in order to enable push based shuffle for a stage. Note this config " +
s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
"enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
"at least 50 mergers to enable push based shuffle for that stage.")
.version("3.1.0")
.doubleConf
.createWithDefault(5)
}
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,33 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with dynamic resource allocation kicking in and significantly reducing the number
* of available active executors, we would still be able to get sufficient shuffle service
* locations for block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize
// TODO changes we cannot disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
s" ${stage.shuffleDep.getMergerLocs.size} merger locations")

logDebug("List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
} else {
logInfo("No available merger locations." +
s" Push-based shuffle disabled for $stage (${stage.name})")
}
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1310,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (pushBasedShuffleEnabled) {
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down Expand Up @@ -2027,6 +2062,11 @@ private[spark] class DAGScheduler(
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
if (pushBasedShuffleEnabled) {
// Remove fetchFailed host in the shuffle push merger list for push based shuffle
hostToUnregisterOutputs.foreach(
host => blockManagerMaster.removeShufflePushMergerLocation(host))
}
blockManagerMaster.removeExecutor(execId)
clearCacheLocs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of host locations for push based shuffle
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ private[spark] object BlockManagerId {
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
}

private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ class BlockManagerMaster(
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
* Get a list of unique shuffle service locations where an executor is successfully
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}

/**
* Remove the host from the candidate list of shuffle push mergers. This can be
* triggered if there is a FetchFailedException on the host
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host))
}

def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()

// Maximum number of merger locations to cache
private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS)

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
Expand All @@ -92,6 +100,8 @@ class BlockManagerMasterEndpoint(

val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf)

logInfo("BlockManagerMasterEndpoint up")
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
// && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
Expand Down Expand Up @@ -139,6 +149,12 @@ class BlockManagerMasterEndpoint(
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))

case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))

case RemoveShufflePushMergerLocation(host) =>
context.reply(removeShufflePushMergerLocation(host))

case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))

Expand Down Expand Up @@ -360,6 +376,17 @@ class BlockManagerMasterEndpoint(

}

private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) {
val shuffleServerId = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
blockManagerId.host, externalShuffleServicePort)
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
shuffleMergerLocations -= shuffleMergerLocations.head._1
}
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
}
}

private def removeExecutor(execId: String): Unit = {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down Expand Up @@ -526,6 +553,10 @@ class BlockManagerMasterEndpoint(

blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)

if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
Expand Down Expand Up @@ -657,6 +688,40 @@ class BlockManagerMasterEndpoint(
}
}

private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagerHosts = blockManagerIdByExecutor.values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))
// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors = shuffleMergerLocations.values
.filterNot(x => hostsToFilter.contains(x.host))
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
val randomFilteredMergersLocations =
if (filteredMergersWithoutExecutors.size >
numMergersNeeded - filteredMergersWithExecutors.size) {
Utils.randomize(filteredMergersWithoutExecutors)
.take(numMergersNeeded - filteredMergersWithExecutors.size)
} else {
filteredMergersWithoutExecutors
}
filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
}
}

private def removeShufflePushMergerLocation(host: String): Unit = {
if (shuffleMergerLocations.contains(host)) {
shuffleMergerLocations.remove(host)
}
}

/**
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,10 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster

case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
extends ToBlockManagerMaster

case class RemoveShufflePushMergerLocation(host: String) extends ToBlockManagerMaster

}
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,14 @@ private[spark] object Utils extends Logging {
master == "local" || master.startsWith("local[")
}

/**
* Push based shuffle can only be enabled when external shuffle service is enabled.
*/
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
(conf.get(IS_TESTING).getOrElse(false) || conf.get(SHUFFLE_SERVICE_ENABLED))
}

/**
* Return whether dynamic allocation is enabled in the given conf.
*/
Expand Down
Loading