From 68fc36449cb481137184f7c4ec3c9d2d68302a17 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 6 Aug 2020 09:42:19 -0700 Subject: [PATCH 01/21] [SPARK-32919][CORE] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions --- .../scala/org/apache/spark/Dependency.scala | 25 +++++++++++++ .../spark/internal/config/package.scala | 27 ++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 30 ++++++++++++++++ .../spark/scheduler/SchedulerBackend.scala | 7 ++++ .../spark/storage/BlockManagerMaster.scala | 12 ++++++- .../storage/BlockManagerMasterEndpoint.scala | 30 ++++++++++++++++ .../spark/storage/BlockManagerMessages.scala | 4 +++ .../scala/org/apache/spark/util/Utils.scala | 11 +++++- .../spark/storage/BlockManagerSuite.scala | 25 +++++++++++-- .../cluster/YarnSchedulerBackend.scala | 36 +++++++++++++++++-- 10 files changed, 200 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index ba8e4d69ba755..aee9407c990e2 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -23,6 +23,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -95,6 +97,29 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this) + // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled + private[this] var _shuffleMergeEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) + + def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { + _shuffleMergeEnabled = shuffleMergeEnabled + } + + def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled + + /** + * Stores the location of the list of chosen external shuffle services for handling the + * shuffle merge requests from mappers in this shuffle map stage. + */ + private[this] var _mergerLocs: Seq[BlockManagerId] = Nil + + def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { + if (mergerLocs != null && mergerLocs.length > 0) { + _mergerLocs = mergerLocs + } + } + + def getMergerLocs: Seq[BlockManagerId] = _mergerLocs + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9a7039a9cfe93..fe10be125e5cf 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1927,4 +1927,31 @@ package object config { .version("3.0.1") .booleanConf .createWithDefault(false) + + private[spark] val PUSH_BASED_SHUFFLE_ENABLED = + ConfigBuilder("spark.shuffle.push.based.enabled") + .doc("Set to 'true' to enable push based shuffle") + .booleanConf + .createWithDefault(false) + + private[spark] val MAX_MERGER_LOCATIONS_CACHED = + ConfigBuilder("spark.shuffle.push.retainedMergerLocations") + .doc("Max number of shuffle services hosts info cached to determine the locations of" + + " shuffle services when pushing the blocks.") + .intConf + .createWithDefault(500) + + private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = + ConfigBuilder("spark.shuffle.push.mergerLocations.minThresholdRatio") + .doc("Minimum percentage of shuffle services (merger locations) should be available with" + + " respect to numPartitions in order to enable push based shuffle for a stage.") + .doubleConf + .createWithDefault(0.05) + + private[spark] val MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = + ConfigBuilder("spark.shuffle.push.mergerLocations.minStaticThreshold") + .doc("Minimum number of shuffle services (merger locations) should be available in order" + + "to enable push based shuffle for a stage.") + .doubleConf + .createWithDefault(5) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 080e0e7f1552f..3a41c862cde9f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -249,6 +249,8 @@ private[spark] class DAGScheduler( private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf) + /** * Called by the TaskSetManager to report task's starting. */ @@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler( execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) } + /** + * If push based shuffle is enabled, set the shuffle services to be used for the given + * shuffle map stage. The list of shuffle services is determined based on the list of + * active executors tracked by block manager master at the start of the stage. + */ + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) { + // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot + // TODO: disable shuffle merge for the retry/reuse cases + val mergerLocs = sc.schedulerBackend.getMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + + if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo("Shuffle merge enabled for %s (%s) with %d merger locations" + .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) + } else { + stage.shuffleDep.setShuffleMergeEnabled(false) + logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name)) + } + } + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") @@ -1281,6 +1305,12 @@ private[spark] class DAGScheduler( stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + // Only generate merger location for a given shuffle dependency once. This way, even if + // this stage gets retried, it would still be merging blocks using the same set of + // shuffle services. + if (s.shuffleDep.shuffleMergeEnabled) { + prepareShuffleServicesForShuffleMapStage(s) + } case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index a566d0a04387c..3bcd4e6287f13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.resource.ResourceProfile +import org.apache.spark.storage.BlockManagerId /** * A backend interface for scheduling systems that allows plugging in different ones under @@ -92,4 +93,10 @@ private[spark] trait SchedulerBackend { */ def maxNumConcurrentTasks(rp: ResourceProfile): Int + /** + * Get the list of both active and dead executors host locations for push based shuffle + * @return List of external shuffle services locations + */ + def getMergerLocations(numPartitions: Int, resourceProfileId: Int): Seq[BlockManagerId] = Nil + } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index f544d47b8e13c..88defe6635e11 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.storage.BlockManagerMessages.{GetMergerLocations, _} import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] @@ -125,6 +125,16 @@ class BlockManagerMaster( driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) } + /** + * Get list of shuffle service locations available for pushing the shuffle blocks + * with push based shuffle + */ + def getMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { + driverEndpoint.askSync[Seq[BlockManagerId]](GetMergerLocations(numMergersNeeded, hostsToFilter)) + } + def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 569d7d32284bc..1968ab9f4ad52 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -74,6 +74,12 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] + // Mapping from host name to shuffle (mergers) services + private val mergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]() + + // Maximum number of merger locations to cache + private val maxRetainedMergerLocations = conf.get(config.MAX_MERGER_LOCATIONS_CACHED) + private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) @@ -139,6 +145,9 @@ class BlockManagerMasterEndpoint( case GetBlockStatus(blockId, askStorageEndpoints) => context.reply(blockStatus(blockId, askStorageEndpoints)) + case GetMergerLocations(numMergersNeeded, hostsToFilter) => + context.reply(getMergerLocations(numMergersNeeded, hostsToFilter)) + case IsExecutorAlive(executorId) => context.reply(blockManagerIdByExecutor.contains(executorId)) @@ -360,6 +369,17 @@ class BlockManagerMasterEndpoint( } + private def addMergerLocation(blockManagerId: BlockManagerId): Unit = { + if (!mergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) { + val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host, + StorageUtils.externalShuffleServicePort(conf)) + if (mergerLocations.size >= maxRetainedMergerLocations) { + mergerLocations -= mergerLocations.head._1 + } + mergerLocations(shuffleServerId.host) = shuffleServerId + } + } + private def removeExecutor(execId: String): Unit = { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) @@ -526,6 +546,8 @@ class BlockManagerMasterEndpoint( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus) + + addMergerLocation(id) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) @@ -657,6 +679,14 @@ class BlockManagerMasterEndpoint( } } + private def getMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { + // Copying the merger locations to a list so that the original mergerLocations won't be shuffled + val mergers = mergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq + Utils.randomize(mergers).take(numMergersNeeded) + } + /** * Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index bbc076cea9ba8..5daa183102263 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -141,4 +141,8 @@ private[spark] object BlockManagerMessages { case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster + + case class GetMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String]) + extends ToBlockManagerMaster + } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b8b044bbad30e..d874f40157336 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -28,7 +28,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files import java.security.SecureRandom -import java.util.{Arrays, Locale, Properties, Random, UUID} +import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.GZIPInputStream @@ -2541,6 +2541,15 @@ private[spark] object Utils extends Logging { master == "local" || master.startsWith("local[") } + /** + * Push based shuffle can only be enabled when external shuffle service is enabled. + * In the initial version, we cannot support pushed based shuffle and adaptive execution + * at the same time. Will improve this in a later version. + */ + def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { + conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED) + } + /** * Return whether dynamic allocation is enabled in the given conf. */ diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5450a4b67c00b..6d4ba87995b96 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1974,7 +1974,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { + test("mergerLocations should be bounded with in" + + " spark.shuffle.push.retainedMergerLocations") { + assert(master.getMergerLocations(10, Set.empty).isEmpty) + makeBlockManager(100, "execA", + transferService = Some(new MockBlockTransferService(10, "hostA"))) + makeBlockManager(100, "execB", + transferService = Some(new MockBlockTransferService(10, "hostB"))) + makeBlockManager(100, "execC", + transferService = Some(new MockBlockTransferService(10, "hostC"))) + makeBlockManager(100, "execD", + transferService = Some(new MockBlockTransferService(10, "hostD"))) + makeBlockManager(100, "execE", + transferService = Some(new MockBlockTransferService(10, "hostA"))) + assert(master.getMergerLocations(10, Set.empty).size == 4) + assert(master.getMergerLocations(10, Set.empty) + .exists(x => Seq("hostC", "hostD", "hostA", "hostB").contains(x.host))) + assert(master.getMergerLocations(10, Set("hostB")).size == 3) + } + + class MockBlockTransferService( + val maxFailures: Int, + hostname: String = "MockBlockTransferServiceHost") extends BlockTransferService { var numCalls = 0 var tempFileManager: DownloadFileManager = null @@ -1992,7 +2013,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def close(): Unit = {} - override def hostName: String = { "MockBlockTransferServiceHost" } + override def hostName: String = { hostname } override def port: Int = { 63332 } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 3f2e8846e85b3..b15f8b5840d83 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import java.util.EnumSet -import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent.atomic.AtomicBoolean import javax.servlet.DispatcherType import scala.concurrent.{ExecutionContext, Future} @@ -29,13 +29,14 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{RpcUtils, ThreadUtils} /** @@ -80,6 +81,14 @@ private[spark] abstract class YarnSchedulerBackend( /** Attempt ID. This is unset for client-mode schedulers */ private var attemptId: Option[ApplicationAttemptId] = None + private val blockManagerMaster: BlockManagerMaster = sc.env.blockManager.master + + private val minMergersThresholdRatio = conf.get(config.MERGER_LOCATIONS_MIN_THRESHOLD_RATIO) + + private val minMergersStaticThreshold = conf.get(config.MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) + + private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -161,6 +170,27 @@ private[spark] abstract class YarnSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + override def getMergerLocations( + numPartitions: Int, + resourceProfileId: Int): Seq[BlockManagerId] = { + // Currently this is naive way of calculating numMergersNeeded for a stage. In future, + // we can use better heuristics to calculate numMergersNeeded for a stage. + val tasksPerExecutor = sc.resourceProfileManager + .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) + val numMergersNeeded = math.min( + math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxNumExecutors) + val minMergersThreshold = math.max(minMergersStaticThreshold, + math.floor(numMergersNeeded * minMergersThresholdRatio).toInt) + val mergerLocations = blockManagerMaster + .getMergerLocations(numMergersNeeded, scheduler.nodeBlacklist()) + logDebug(s"Num merger locations available ${mergerLocations.length}") + if (mergerLocations.size < numMergersNeeded && mergerLocations.size < minMergersThreshold) { + Seq.empty[BlockManagerId] + } else { + mergerLocations + } + } + /** * Add filters to the SparkUI. */ From 2688df2780b94785f08f665cef566ba4caef3e66 Mon Sep 17 00:00:00 2001 From: Min Shen Date: Tue, 27 Oct 2020 16:01:59 -0700 Subject: [PATCH 02/21] Empty commit to add Min Shen to authors list From 0423970aaeb992201942ff4e82e64476283ca568 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 2 Nov 2020 09:44:10 -0800 Subject: [PATCH 03/21] Addressed some of the comments both from Mridul and Thomas --- .../scala/org/apache/spark/Dependency.scala | 4 +-- .../spark/internal/config/package.scala | 21 +++++++++------- .../apache/spark/scheduler/DAGScheduler.scala | 12 ++++++--- .../spark/scheduler/SchedulerBackend.scala | 8 +++++- .../spark/storage/BlockManagerMaster.scala | 11 ++++---- .../storage/BlockManagerMasterEndpoint.scala | 25 +++++++++++-------- .../spark/storage/BlockManagerMessages.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 8 +++--- .../cluster/YarnSchedulerBackend.scala | 18 +++++++------ 9 files changed, 65 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index aee9407c990e2..9dbc213399e70 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -98,7 +98,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( shuffleId, this) // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled - private[this] var _shuffleMergeEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) + private[spark] var _shuffleMergeEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { _shuffleMergeEnabled = shuffleMergeEnabled @@ -110,7 +110,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( * Stores the location of the list of chosen external shuffle services for handling the * shuffle merge requests from mappers in this shuffle map stage. */ - private[this] var _mergerLocs: Seq[BlockManagerId] = Nil + private[spark] var _mergerLocs: Seq[BlockManagerId] = Nil def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { if (mergerLocs != null && mergerLocs.length > 0) { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index fe10be125e5cf..5f2b3c8c444ba 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1929,29 +1929,32 @@ package object config { .createWithDefault(false) private[spark] val PUSH_BASED_SHUFFLE_ENABLED = - ConfigBuilder("spark.shuffle.push.based.enabled") + ConfigBuilder("spark.shuffle.push.enabled") .doc("Set to 'true' to enable push based shuffle") .booleanConf .createWithDefault(false) private[spark] val MAX_MERGER_LOCATIONS_CACHED = ConfigBuilder("spark.shuffle.push.retainedMergerLocations") - .doc("Max number of shuffle services hosts info cached to determine the locations of" + - " shuffle services when pushing the blocks.") + .doc("Maximum number of shuffle push mergers locations cached for push based shuffle." + + "Currently Shuffle push merger locations are nothing but shuffle services where an" + + "executor is launched in the case of Push based shuffle.") .intConf .createWithDefault(500) private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = - ConfigBuilder("spark.shuffle.push.mergerLocations.minThresholdRatio") - .doc("Minimum percentage of shuffle services (merger locations) should be available with" + - " respect to numPartitions in order to enable push based shuffle for a stage.") + ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio") + .doc("Minimum percentage of shuffle push mergers locations required to enable push based" + + "shuffle for the stage with respect to number of partitions of the child stage. This is" + + " the number of unique Node Manager locations needed to enable push based shuffle.") .doubleConf .createWithDefault(0.05) private[spark] val MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = - ConfigBuilder("spark.shuffle.push.mergerLocations.minStaticThreshold") - .doc("Minimum number of shuffle services (merger locations) should be available in order" + - "to enable push based shuffle for a stage.") + ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") + .doc("Minimum static number of of shuffle push mergers locations should be available in" + + " order to enable push based shuffle for a stage. Note this config works in" + + " conjunction with spark.shuffle.push.mergersMinThresholdRatio") .doubleConf .createWithDefault(5) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3a41c862cde9f..0c03c34419783 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1256,15 +1256,19 @@ private[spark] class DAGScheduler( /** * If push based shuffle is enabled, set the shuffle services to be used for the given - * shuffle map stage. The list of shuffle services is determined based on the list of - * active executors tracked by block manager master at the start of the stage. + * shuffle map stage for block push/merge. + * + * Even with DRA kicking in and significantly reducing the number of available active + * executors, we would still be able to get sufficient shuffle service locations for + * block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) { // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot // TODO: disable shuffle merge for the retry/reuse cases - val mergerLocs = sc.schedulerBackend.getMergerLocations( + val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + logDebug(s"List of shuffle push merger locations " + + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 3bcd4e6287f13..b27eb243c6535 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -95,8 +95,14 @@ private[spark] trait SchedulerBackend { /** * Get the list of both active and dead executors host locations for push based shuffle + * + * Currently push based shuffle is disabled for both stage retry and stage reuse cases + * (for eg: in the case where few partitions are lost due to failure). Hence this method + * should be invoked only once for a ShuffleDependency. * @return List of external shuffle services locations */ - def getMergerLocations(numPartitions: Int, resourceProfileId: Int): Seq[BlockManagerId] = Nil + def getShufflePushMergerLocations( + numPartitions: Int, + resourceProfileId: Int): Seq[BlockManagerId] = Nil } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 88defe6635e11..3531d21cda6e1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.storage.BlockManagerMessages.{GetMergerLocations, _} +import org.apache.spark.storage.BlockManagerMessages.{GetShufflePushMergerLocations, _} import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] @@ -126,13 +126,14 @@ class BlockManagerMaster( } /** - * Get list of shuffle service locations available for pushing the shuffle blocks - * with push based shuffle + * Get list of unique shuffle service locations where an executor is successfully + * registered in the past for block push/merge with push based shuffle. */ - def getMergerLocations( + def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - driverEndpoint.askSync[Seq[BlockManagerId]](GetMergerLocations(numMergersNeeded, hostsToFilter)) + driverEndpoint.askSync[Seq[BlockManagerId]]( + GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter)) } def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 1968ab9f4ad52..5710d07873246 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -74,8 +74,10 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] - // Mapping from host name to shuffle (mergers) services - private val mergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]() + // Mapping from host name to shuffle (mergers) services where the current app + // registered an executor in the past. Older hosts are removed when the + // maxRetainedMergerLocations size is reached in favor of newer locations. + private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]() // Maximum number of merger locations to cache private val maxRetainedMergerLocations = conf.get(config.MAX_MERGER_LOCATIONS_CACHED) @@ -145,8 +147,8 @@ class BlockManagerMasterEndpoint( case GetBlockStatus(blockId, askStorageEndpoints) => context.reply(blockStatus(blockId, askStorageEndpoints)) - case GetMergerLocations(numMergersNeeded, hostsToFilter) => - context.reply(getMergerLocations(numMergersNeeded, hostsToFilter)) + case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) => + context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter)) case IsExecutorAlive(executorId) => context.reply(blockManagerIdByExecutor.contains(executorId)) @@ -370,13 +372,13 @@ class BlockManagerMasterEndpoint( } private def addMergerLocation(blockManagerId: BlockManagerId): Unit = { - if (!mergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) { + if (!shuffleMergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) { val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host, StorageUtils.externalShuffleServicePort(conf)) - if (mergerLocations.size >= maxRetainedMergerLocations) { - mergerLocations -= mergerLocations.head._1 + if (shuffleMergerLocations.size >= maxRetainedMergerLocations) { + shuffleMergerLocations -= shuffleMergerLocations.head._1 } - mergerLocations(shuffleServerId.host) = shuffleServerId + shuffleMergerLocations(shuffleServerId.host) = shuffleServerId } } @@ -679,11 +681,12 @@ class BlockManagerMasterEndpoint( } } - private def getMergerLocations( + private def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - // Copying the merger locations to a list so that the original mergerLocations won't be shuffled - val mergers = mergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq + // Copying the merger locations to a list so that the original + // shuffleMergerLocations won't be shuffled + val mergers = shuffleMergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq Utils.randomize(mergers).take(numMergersNeeded) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 5daa183102263..098c98d9f6dd2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -142,7 +142,7 @@ private[spark] object BlockManagerMessages { case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster - case class GetMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String]) + case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String]) extends ToBlockManagerMaster } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6d4ba87995b96..57cba15c187f9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1976,7 +1976,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("mergerLocations should be bounded with in" + " spark.shuffle.push.retainedMergerLocations") { - assert(master.getMergerLocations(10, Set.empty).isEmpty) + assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty) makeBlockManager(100, "execA", transferService = Some(new MockBlockTransferService(10, "hostA"))) makeBlockManager(100, "execB", @@ -1987,10 +1987,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE transferService = Some(new MockBlockTransferService(10, "hostD"))) makeBlockManager(100, "execE", transferService = Some(new MockBlockTransferService(10, "hostA"))) - assert(master.getMergerLocations(10, Set.empty).size == 4) - assert(master.getMergerLocations(10, Set.empty) + assert(master.getShufflePushMergerLocations(10, Set.empty).size == 4) + assert(master.getShufflePushMergerLocations(10, Set.empty) .exists(x => Seq("hostC", "hostD", "hostA", "hostB").contains(x.host))) - assert(master.getMergerLocations(10, Set("hostB")).size == 3) + assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3) } class MockBlockTransferService( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index b15f8b5840d83..2fb742dc6ab18 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -19,17 +19,16 @@ package org.apache.spark.scheduler.cluster import java.util.EnumSet import java.util.concurrent.atomic.AtomicBoolean + import javax.servlet.DispatcherType import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import scala.util.control.NonFatal - import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} - import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{Logging, config} import org.apache.spark.internal.config.DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile @@ -37,7 +36,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Abstract Yarn scheduler backend that contains common logic @@ -170,19 +169,24 @@ private[spark] abstract class YarnSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def getMergerLocations( + override def getShufflePushMergerLocations( numPartitions: Int, resourceProfileId: Int): Seq[BlockManagerId] = { // Currently this is naive way of calculating numMergersNeeded for a stage. In future, // we can use better heuristics to calculate numMergersNeeded for a stage. + val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { + maxNumExecutors + } else { + Int.MaxValue + } val tasksPerExecutor = sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) val numMergersNeeded = math.min( - math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxNumExecutors) + math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxExecutors) val minMergersThreshold = math.max(minMergersStaticThreshold, math.floor(numMergersNeeded * minMergersThresholdRatio).toInt) val mergerLocations = blockManagerMaster - .getMergerLocations(numMergersNeeded, scheduler.nodeBlacklist()) + .getShufflePushMergerLocations(numMergersNeeded, scheduler.nodeBlacklist()) logDebug(s"Num merger locations available ${mergerLocations.length}") if (mergerLocations.size < numMergersNeeded && mergerLocations.size < minMergersThreshold) { Seq.empty[BlockManagerId] From eb93fe17620e72b147080f8563683dacb44b45c0 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 2 Nov 2020 17:09:45 -0800 Subject: [PATCH 04/21] Address style check issues --- core/src/main/scala/org/apache/spark/Dependency.scala | 3 ++- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 9dbc213399e70..b286468de277a 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -98,7 +98,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( shuffleId, this) // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled - private[spark] var _shuffleMergeEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) + private[spark] var _shuffleMergeEnabled = + Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { _shuffleMergeEnabled = shuffleMergeEnabled diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 2fb742dc6ab18..72d4040b833c7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -19,16 +19,17 @@ package org.apache.spark.scheduler.cluster import java.util.EnumSet import java.util.concurrent.atomic.AtomicBoolean - import javax.servlet.DispatcherType import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import scala.util.control.NonFatal + import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} + import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile From f2f61e22dc8f86c144f2ddb8b11b851073cb2c3b Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 3 Nov 2020 11:31:52 -0800 Subject: [PATCH 05/21] Address style checks and Mridul's comment --- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 72d4040b833c7..2306271def28e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging} -import org.apache.spark.internal.config.DYN_ALLOCATION_MAX_EXECUTORS +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -89,6 +89,8 @@ private[spark] abstract class YarnSchedulerBackend( private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + private val numExecutors = conf.get(EXECUTOR_INSTANCES).getOrElse(0) + /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -178,7 +180,7 @@ private[spark] abstract class YarnSchedulerBackend( val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { maxNumExecutors } else { - Int.MaxValue + numExecutors } val tasksPerExecutor = sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) From 3a6219f8e67ac1fea5a2fc1302477f0eb661db69 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 3 Nov 2020 11:34:47 -0800 Subject: [PATCH 06/21] Address Tom's comment --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 5710d07873246..42072f9894ca2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -684,10 +684,8 @@ class BlockManagerMasterEndpoint( private def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - // Copying the merger locations to a list so that the original - // shuffleMergerLocations won't be shuffled val mergers = shuffleMergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq - Utils.randomize(mergers).take(numMergersNeeded) + mergers.take(numMergersNeeded) } /** From ca44d0394c60706fc1515118ec7e1d0a697c2eda Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 4 Nov 2020 17:48:48 -0800 Subject: [PATCH 07/21] Prefer active executors for merger locations --- .../storage/BlockManagerMasterEndpoint.scala | 19 +++++++++++++-- .../spark/storage/BlockManagerSuite.scala | 23 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 42072f9894ca2..d1bf53755f94f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -684,8 +684,23 @@ class BlockManagerMasterEndpoint( private def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - val mergers = shuffleMergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq - mergers.take(numMergersNeeded) + val activeBlockManagers = blockManagerIdByExecutor.groupBy(_._2.host) + .mapValues(_.head).values.map(_._2).toSet + val filteredActiveBlockManagers = activeBlockManagers + .filterNot(x => hostsToFilter.contains(x.host)) + val filteredActiveMergers = filteredActiveBlockManagers.map( + x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) + + // Enough mergers are available as part of active executors list + if (filteredActiveMergers.size >= numMergersNeeded) { + filteredActiveMergers.toSeq + } else { + // Delta mergers added from inactive mergers list to the active mergers list + val filteredDeadMergers = shuffleMergerLocations.values + .filterNot(mergerHost => filteredActiveMergers.exists(x => x.host == mergerHost.host)) + filteredActiveMergers.toSeq ++ + filteredDeadMergers.toSeq.take(numMergersNeeded - filteredActiveMergers.size) + } } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 57cba15c187f9..34b0bf0d2fc16 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1993,6 +1993,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3) } + test("Prefer active executors locations for shuffle push mergers") { + makeBlockManager(100, "execA", + transferService = Some(new MockBlockTransferService(10, "hostA"))) + makeBlockManager(100, "execB", + transferService = Some(new MockBlockTransferService(10, "hostB"))) + makeBlockManager(100, "execC", + transferService = Some(new MockBlockTransferService(10, "hostC"))) + makeBlockManager(100, "execD", + transferService = Some(new MockBlockTransferService(10, "hostD"))) + makeBlockManager(100, "execE", + transferService = Some(new MockBlockTransferService(10, "hostA"))) + assert(master.getShufflePushMergerLocations(5, Set.empty).size == 4) + + master.removeExecutor("execA") + master.removeExecutor("execE") + + assert(master.getShufflePushMergerLocations(3, Set.empty).size == 3) + val expectedHosts = Set("hostB", "hostC", "hostD") + val shufflePushMergers = master + .getShufflePushMergerLocations(3, Set.empty).map(x => x.host).toSet + assert(expectedHosts.forall(x => shufflePushMergers.contains(x))) + } + class MockBlockTransferService( val maxFailures: Int, hostname: String = "MockBlockTransferServiceHost") extends BlockTransferService { From 2172f65261df17f8c41caddcc9436fff8e15b4d2 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 5 Nov 2020 11:09:51 -0800 Subject: [PATCH 08/21] Addressed comments from ngone51 and otterc --- .../main/scala/org/apache/spark/Dependency.scala | 12 ++++++------ .../apache/spark/internal/config/package.scala | 6 +++++- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 +++++++-------- .../apache/spark/storage/BlockManagerMaster.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 2 +- .../apache/spark/storage/BlockManagerSuite.scala | 2 +- .../scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 7 files changed, 22 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index b286468de277a..2f685ba974ebe 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -98,28 +98,28 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( shuffleId, this) // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled - private[spark] var _shuffleMergeEnabled = + private[spark] var shuffleMergeEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { - _shuffleMergeEnabled = shuffleMergeEnabled + this.shuffleMergeEnabled = shuffleMergeEnabled } - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled + def isShuffleMergeEnabled : Boolean = shuffleMergeEnabled /** * Stores the location of the list of chosen external shuffle services for handling the * shuffle merge requests from mappers in this shuffle map stage. */ - private[spark] var _mergerLocs: Seq[BlockManagerId] = Nil + private[spark] var mergerLocs: Seq[BlockManagerId] = Nil def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { if (mergerLocs != null && mergerLocs.length > 0) { - _mergerLocs = mergerLocs + this.mergerLocs = mergerLocs } } - def getMergerLocs: Seq[BlockManagerId] = _mergerLocs + def getMergerLocs: Seq[BlockManagerId] = mergerLocs _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5f2b3c8c444ba..ffe40451e10f5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1930,7 +1930,11 @@ package object config { private[spark] val PUSH_BASED_SHUFFLE_ENABLED = ConfigBuilder("spark.shuffle.push.enabled") - .doc("Set to 'true' to enable push based shuffle") + .doc("Set to 'true' to enable push based shuffle on the client side and this works in" + + "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl" + + "which needs to be set with the appropriate" + + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based" + + "shuffle to be enabled") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0c03c34419783..c8a9298de73b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -249,8 +249,6 @@ private[spark] class DAGScheduler( private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) - private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf) - /** * Called by the TaskSetManager to report task's starting. */ @@ -1258,17 +1256,15 @@ private[spark] class DAGScheduler( * If push based shuffle is enabled, set the shuffle services to be used for the given * shuffle map stage for block push/merge. * - * Even with DRA kicking in and significantly reducing the number of available active - * executors, we would still be able to get sufficient shuffle service locations for - * block push/merge by getting the historical locations of past executors. + * Even with dynamic resource allocation kicking in and significantly reducing the number + * of available active executors, we would still be able to get sufficient shuffle service + * locations for block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) { // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot // TODO: disable shuffle merge for the retry/reuse cases val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - logDebug(s"List of shuffle push merger locations " + - s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) @@ -1278,6 +1274,9 @@ private[spark] class DAGScheduler( stage.shuffleDep.setShuffleMergeEnabled(false) logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name)) } + + logDebug(s"List of shuffle push merger locations " + + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } /** Called when stage's parents are available and we can now do its task. */ @@ -1312,7 +1311,7 @@ private[spark] class DAGScheduler( // Only generate merger location for a given shuffle dependency once. This way, even if // this stage gets retried, it would still be merging blocks using the same set of // shuffle services. - if (s.shuffleDep.shuffleMergeEnabled) { + if (s.shuffleDep.isShuffleMergeEnabled) { prepareShuffleServicesForShuffleMapStage(s) } case s: ResultStage => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3531d21cda6e1..793721970356c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -126,7 +126,7 @@ class BlockManagerMaster( } /** - * Get list of unique shuffle service locations where an executor is successfully + * Get a list of unique shuffle service locations where an executor is successfully * registered in the past for block push/merge with push based shuffle. */ def getShufflePushMergerLocations( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d1bf53755f94f..e9cc652ce65b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -685,7 +685,7 @@ class BlockManagerMasterEndpoint( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { val activeBlockManagers = blockManagerIdByExecutor.groupBy(_._2.host) - .mapValues(_.head).values.map(_._2).toSet + .mapValues(_.head).values.map(_._2).toSet val filteredActiveBlockManagers = activeBlockManagers .filterNot(x => hostsToFilter.contains(x.host)) val filteredActiveMergers = filteredActiveBlockManagers.map( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 34b0bf0d2fc16..29966f29958e0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1974,7 +1974,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - test("mergerLocations should be bounded with in" + + test("Shuffle push merger locations should be bounded with in" + " spark.shuffle.push.retainedMergerLocations") { assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty) makeBlockManager(100, "execA", diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 2306271def28e..f15642c003aab 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -183,7 +183,7 @@ private[spark] abstract class YarnSchedulerBackend( numExecutors } val tasksPerExecutor = sc.resourceProfileManager - .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) + .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) val numMergersNeeded = math.min( math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxExecutors) val minMergersThreshold = math.max(minMergersStaticThreshold, From 1e768246370f3a74f9d1b53624cd3907dcaad28c Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 5 Nov 2020 19:57:30 -0800 Subject: [PATCH 09/21] Address ngone51 review comments --- core/src/main/scala/org/apache/spark/Dependency.scala | 11 ----------- .../org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 2f685ba974ebe..ceff3a7d5d4e5 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -97,16 +96,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this) - // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled - private[spark] var shuffleMergeEnabled = - Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) - - def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { - this.shuffleMergeEnabled = shuffleMergeEnabled - } - - def isShuffleMergeEnabled : Boolean = shuffleMergeEnabled - /** * Stores the location of the list of chosen external shuffle services for handling the * shuffle merge requests from mappers in this shuffle map stage. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c8a9298de73b9..f4ca94a48eacf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -249,6 +249,8 @@ private[spark] class DAGScheduler( private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf) + /** * Called by the TaskSetManager to report task's starting. */ @@ -1271,7 +1273,6 @@ private[spark] class DAGScheduler( logInfo("Shuffle merge enabled for %s (%s) with %d merger locations" .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) } else { - stage.shuffleDep.setShuffleMergeEnabled(false) logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name)) } @@ -1311,7 +1312,7 @@ private[spark] class DAGScheduler( // Only generate merger location for a given shuffle dependency once. This way, even if // this stage gets retried, it would still be merging blocks using the same set of // shuffle services. - if (s.shuffleDep.isShuffleMergeEnabled) { + if (pushBasedShuffleEnabled) { prepareShuffleServicesForShuffleMapStage(s) } case s: ResultStage => From 3e0310955135764422c3a2723283cd1f0935d8cc Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 9 Nov 2020 11:28:30 -0800 Subject: [PATCH 10/21] Address Mridul's review comments --- .../cluster/YarnSchedulerBackend.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index f15642c003aab..afb275ae223f5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -175,8 +175,8 @@ private[spark] abstract class YarnSchedulerBackend( override def getShufflePushMergerLocations( numPartitions: Int, resourceProfileId: Int): Seq[BlockManagerId] = { - // Currently this is naive way of calculating numMergersNeeded for a stage. In future, - // we can use better heuristics to calculate numMergersNeeded for a stage. + // Currently this is naive way of calculating numMergersDesired for a stage. In future, + // we can use better heuristics to calculate numMergersDesired for a stage. val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { maxNumExecutors } else { @@ -184,14 +184,17 @@ private[spark] abstract class YarnSchedulerBackend( } val tasksPerExecutor = sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) - val numMergersNeeded = math.min( + val numMergersDesired = math.min( math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxExecutors) - val minMergersThreshold = math.max(minMergersStaticThreshold, - math.floor(numMergersNeeded * minMergersThresholdRatio).toInt) + val minMergersNeeded = math.max(minMergersStaticThreshold, + math.floor(numMergersDesired * minMergersThresholdRatio).toInt) + + // Request for numMergersDesired shuffle mergers to BlockManagerMasterEndpoint + // and if its less than minMergersNeeded, we disable push based shuffle. val mergerLocations = blockManagerMaster - .getShufflePushMergerLocations(numMergersNeeded, scheduler.nodeBlacklist()) + .getShufflePushMergerLocations(numMergersDesired, scheduler.nodeBlacklist()) logDebug(s"Num merger locations available ${mergerLocations.length}") - if (mergerLocations.size < numMergersNeeded && mergerLocations.size < minMergersThreshold) { + if (mergerLocations.size < numMergersDesired && mergerLocations.size < minMergersNeeded) { Seq.empty[BlockManagerId] } else { mergerLocations From a1c8831b1c73b48cf3d5839668d871826d9232eb Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 10 Nov 2020 10:32:33 -0800 Subject: [PATCH 11/21] Addressed review comments --- .../scala/org/apache/spark/Dependency.scala | 2 +- .../spark/internal/config/package.scala | 10 +++++--- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../storage/BlockManagerMasterEndpoint.scala | 25 +++++++++++-------- .../cluster/YarnSchedulerBackend.scala | 6 +++-- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index ceff3a7d5d4e5..d21b9d9833e9e 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -103,7 +103,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( private[spark] var mergerLocs: Seq[BlockManagerId] = Nil def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { - if (mergerLocs != null && mergerLocs.length > 0) { + if (mergerLocs != null) { this.mergerLocs = mergerLocs } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 36917efb87886..6ebdbc7dcb7e0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1946,30 +1946,34 @@ package object config { "which needs to be set with the appropriate" + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based" + "shuffle to be enabled") + .version("3.1.0") .booleanConf .createWithDefault(false) - private[spark] val MAX_MERGER_LOCATIONS_CACHED = + private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = ConfigBuilder("spark.shuffle.push.retainedMergerLocations") .doc("Maximum number of shuffle push mergers locations cached for push based shuffle." + "Currently Shuffle push merger locations are nothing but shuffle services where an" + "executor is launched in the case of Push based shuffle.") + .version("3.1.0") .intConf .createWithDefault(500) - private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = + private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio") .doc("Minimum percentage of shuffle push mergers locations required to enable push based" + "shuffle for the stage with respect to number of partitions of the child stage. This is" + " the number of unique Node Manager locations needed to enable push based shuffle.") + .version("3.1.0") .doubleConf .createWithDefault(0.05) - private[spark] val MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = + private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") .doc("Minimum static number of of shuffle push mergers locations should be available in" + " order to enable push based shuffle for a stage. Note this config works in" + " conjunction with spark.shuffle.push.mergersMinThresholdRatio") + .version("3.1.0") .doubleConf .createWithDefault(5) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 997e82ac6b73c..90b0dc1e6c94e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1262,7 +1262,7 @@ private[spark] class DAGScheduler( * of available active executors, we would still be able to get sufficient shuffle service * locations for block push/merge by getting the historical locations of past executors. */ - private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) { + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot // TODO: disable shuffle merge for the retry/reuse cases val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index e9e1a91ffad1e..ad1df0e5cb444 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -80,7 +80,7 @@ class BlockManagerMasterEndpoint( private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]() // Maximum number of merger locations to cache - private val maxRetainedMergerLocations = conf.get(config.MAX_MERGER_LOCATIONS_CACHED) + private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS) private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) @@ -372,7 +372,7 @@ class BlockManagerMasterEndpoint( } private def addMergerLocation(blockManagerId: BlockManagerId): Unit = { - if (!shuffleMergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) { + if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) { val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host, StorageUtils.externalShuffleServicePort(conf)) if (shuffleMergerLocations.size >= maxRetainedMergerLocations) { @@ -684,22 +684,25 @@ class BlockManagerMasterEndpoint( private def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - val activeBlockManagers = blockManagerIdByExecutor.groupBy(_._2.host) + val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) .mapValues(_.head).values.map(_._2).toSet - val filteredActiveBlockManagers = activeBlockManagers + val filteredBlockManagersWithExecutors = blockManagersWithExecutors .filterNot(x => hostsToFilter.contains(x.host)) - val filteredActiveMergers = filteredActiveBlockManagers.map( + val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) // Enough mergers are available as part of active executors list - if (filteredActiveMergers.size >= numMergersNeeded) { - filteredActiveMergers.toSeq + if (filteredMergersWithExecutors.size >= numMergersNeeded) { + filteredMergersWithExecutors.toSeq } else { // Delta mergers added from inactive mergers list to the active mergers list - val filteredDeadMergers = shuffleMergerLocations.values - .filterNot(mergerHost => filteredActiveMergers.exists(x => x.host == mergerHost.host)) - filteredActiveMergers.toSeq ++ - filteredDeadMergers.toSeq.take(numMergersNeeded - filteredActiveMergers.size) + val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) + val filteredMergersWithoutExecutors = shuffleMergerLocations.values + .filterNot( + mergerHost => filteredMergersWithExecutorsHosts.contains(mergerHost.host)) + filteredMergersWithExecutors.toSeq ++ + filteredMergersWithoutExecutors.toSeq + .take(numMergersNeeded - filteredMergersWithExecutors.size) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1cfb89077ea6c..f5e0a26d5dfd0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -83,9 +83,11 @@ private[spark] abstract class YarnSchedulerBackend( private val blockManagerMaster: BlockManagerMaster = sc.env.blockManager.master - private val minMergersThresholdRatio = conf.get(config.MERGER_LOCATIONS_MIN_THRESHOLD_RATIO) + private val minMergersThresholdRatio = + conf.get(config.SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO) - private val minMergersStaticThreshold = conf.get(config.MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) + private val minMergersStaticThreshold = + conf.get(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) From 047ad0c48d23d06e07c30fbfdfc1754426365866 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 10 Nov 2020 16:20:08 -0800 Subject: [PATCH 12/21] Address review comments --- .../spark/internal/config/package.scala | 32 ++++++++++++------- .../apache/spark/scheduler/DAGScheduler.scala | 4 +-- .../storage/BlockManagerMasterEndpoint.scala | 6 +++- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6ebdbc7dcb7e0..77dba2c4eb6b9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1941,10 +1941,10 @@ package object config { private[spark] val PUSH_BASED_SHUFFLE_ENABLED = ConfigBuilder("spark.shuffle.push.enabled") - .doc("Set to 'true' to enable push based shuffle on the client side and this works in" + - "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl" + - "which needs to be set with the appropriate" + - "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based" + + .doc("Set to 'true' to enable push based shuffle on the client side and this works in " + + "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " + + "which needs to be set with the appropriate " + + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " + "shuffle to be enabled") .version("3.1.0") .booleanConf @@ -1952,8 +1952,8 @@ package object config { private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = ConfigBuilder("spark.shuffle.push.retainedMergerLocations") - .doc("Maximum number of shuffle push mergers locations cached for push based shuffle." + - "Currently Shuffle push merger locations are nothing but shuffle services where an" + + .doc("Maximum number of shuffle push mergers locations cached for push based shuffle. " + + "Currently shuffle push merger locations are nothing but shuffle services where an " + "executor is launched in the case of Push based shuffle.") .version("3.1.0") .intConf @@ -1961,18 +1961,26 @@ package object config { private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio") - .doc("Minimum percentage of shuffle push mergers locations required to enable push based" + - "shuffle for the stage with respect to number of partitions of the child stage. This is" + - " the number of unique Node Manager locations needed to enable push based shuffle.") + .doc("The minimum number of shuffle merger locations required to enable push based " + + "shuffle for a stage. This is specified as a ratio of the number of partitions in " + + "the child stage. For example, a reduce stage which has 100 partitions and uses the " + + "default value 0.05 requires at least 5 unique merger locations to enable push based " + + "shuffle. Merger locations are currently defined as external shuffle services.") .version("3.1.0") .doubleConf .createWithDefault(0.05) private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") - .doc("Minimum static number of of shuffle push mergers locations should be available in" + - " order to enable push based shuffle for a stage. Note this config works in" + - " conjunction with spark.shuffle.push.mergersMinThresholdRatio") + .doc("The static threshold for number of shuffle push mergers locations should be " + + " available in order to enable push based shuffle for a stage. Note this config " + + "works in conjunction with spark.shuffle.push.mergersMinThresholdRatio. Maximum " + + "of spark.shuffle.push.mergersMinStaticThreshold and " + + "spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers needed to " + + "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " + + "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + + "spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need atleast 50 " + + "mergers to enable push based shuffle for that stage.") .version("3.1.0") .doubleConf .createWithDefault(5) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 90b0dc1e6c94e..2b8278c3002f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1263,8 +1263,8 @@ private[spark] class DAGScheduler( * locations for block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { - // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot - // TODO: disable shuffle merge for the retry/reuse cases + // TODO SPARK-32920: Handle stage reuse/retry cases separately as without finalize + // TODO changes we cannot disable shuffle merge for the retry/reuse cases val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ad1df0e5cb444..251f8f7cb3004 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -100,6 +100,8 @@ class BlockManagerMasterEndpoint( val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) + private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf) + logInfo("BlockManagerMasterEndpoint up") // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED) // && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)` @@ -549,7 +551,9 @@ class BlockManagerMasterEndpoint( blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus) - addMergerLocation(id) + if (pushBasedShuffleEnabled) { + addMergerLocation(id) + } } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) From 2d6d26655d63cd59a97358483c8b765c3df95d50 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 12 Nov 2020 13:19:29 -0800 Subject: [PATCH 13/21] Address ngone51 review comments --- .../org/apache/spark/internal/config/package.scala | 12 ++++++------ .../org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++---- .../apache/spark/scheduler/SchedulerBackend.scala | 2 +- .../apache/spark/storage/BlockManagerMaster.scala | 2 +- .../spark/storage/BlockManagerMasterEndpoint.scala | 4 ++-- .../main/scala/org/apache/spark/util/Utils.scala | 4 +++- .../apache/spark/storage/BlockManagerSuite.scala | 13 +++++++------ .../scheduler/cluster/YarnSchedulerBackend.scala | 7 +++---- 8 files changed, 28 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 77dba2c4eb6b9..59ee65a7716f0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1972,15 +1972,15 @@ package object config { private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") - .doc("The static threshold for number of shuffle push mergers locations should be " + + .doc(s"The static threshold for number of shuffle push mergers locations should be " + " available in order to enable push based shuffle for a stage. Note this config " + - "works in conjunction with spark.shuffle.push.mergersMinThresholdRatio. Maximum " + + s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. Maximum " + "of spark.shuffle.push.mergersMinStaticThreshold and " + - "spark.shuffle.push.mergersMinThresholdRatio ratio number of mergers needed to " + + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " + "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " + - "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + - "spark.shuffle.push.mergersMinThresholdRatio set to 0.05, we would need atleast 50 " + - "mergers to enable push based shuffle for that stage.") + s"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need" + + s" atleast 50 mergers to enable push based shuffle for that stage.") .version("3.1.0") .doubleConf .createWithDefault(5) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2b8278c3002f2..fe3bf177de2c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1272,12 +1272,13 @@ private[spark] class DAGScheduler( stage.shuffleDep.setMergerLocs(mergerLocs) logInfo("Shuffle merge enabled for %s (%s) with %d merger locations" .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) + + logDebug(s"List of shuffle push merger locations " + + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } else { - logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name)) + logInfo(("No available merger locations." + + " Shuffle merge disabled for %s (%s)").format(stage, stage.name)) } - - logDebug(s"List of shuffle push merger locations " + - s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } /** Called when stage's parents are available and we can now do its task. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index b27eb243c6535..b2acdb3e12a6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -94,7 +94,7 @@ private[spark] trait SchedulerBackend { def maxNumConcurrentTasks(rp: ResourceProfile): Int /** - * Get the list of both active and dead executors host locations for push based shuffle + * Get the list of host locations for push based shuffle * * Currently push based shuffle is disabled for both stage retry and stage reuse cases * (for eg: in the case where few partitions are lost due to failure). Hence this method diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 793721970356c..812e241f968de 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.storage.BlockManagerMessages.{GetShufflePushMergerLocations, _} +import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 251f8f7cb3004..b31ed437a59ab 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -702,8 +702,8 @@ class BlockManagerMasterEndpoint( // Delta mergers added from inactive mergers list to the active mergers list val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) val filteredMergersWithoutExecutors = shuffleMergerLocations.values - .filterNot( - mergerHost => filteredMergersWithExecutorsHosts.contains(mergerHost.host)) + .filterNot(x => hostsToFilter.contains(x.host)) + .filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host)) filteredMergersWithExecutors.toSeq ++ filteredMergersWithoutExecutors.toSeq .take(numMergersNeeded - filteredMergersWithExecutors.size) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d874f40157336..31970ee989543 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2547,7 +2547,9 @@ private[spark] object Utils extends Logging { * at the same time. Will improve this in a later version. */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED) + val pushBasedEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED) + pushBasedEnabled && + (conf.get(IS_TESTING).getOrElse(false) || conf.get(SHUFFLE_SERVICE_ENABLED)) } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 46d8db280b653..0b3d245a49a4d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -100,6 +100,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) .set(Network.RPC_ASK_TIMEOUT, "5s") + .set(PUSH_BASED_SHUFFLE_ENABLED, true) } private def makeSortShuffleManager(): SortShuffleManager = { @@ -1988,8 +1989,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE makeBlockManager(100, "execE", transferService = Some(new MockBlockTransferService(10, "hostA"))) assert(master.getShufflePushMergerLocations(10, Set.empty).size == 4) - assert(master.getShufflePushMergerLocations(10, Set.empty) - .exists(x => Seq("hostC", "hostD", "hostA", "hostB").contains(x.host))) + assert(master.getShufflePushMergerLocations(10, Set.empty).map(_.host).sorted === + Seq("hostC", "hostD", "hostA", "hostB").sorted) assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3) } @@ -2010,10 +2011,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor("execE") assert(master.getShufflePushMergerLocations(3, Set.empty).size == 3) - val expectedHosts = Set("hostB", "hostC", "hostD") - val shufflePushMergers = master - .getShufflePushMergerLocations(3, Set.empty).map(x => x.host).toSet - assert(expectedHosts.forall(x => shufflePushMergers.contains(x))) + assert(master.getShufflePushMergerLocations(3, Set.empty).map(_.host).sorted === + Seq("hostC", "hostB", "hostD").sorted) + assert(master.getShufflePushMergerLocations(4, Set.empty).map(_.host).sorted === + Seq("hostB", "hostA", "hostC", "hostD").sorted) } test("SPARK-33387 Support ordered shuffle block migration") { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index f5e0a26d5dfd0..04f6506a19f41 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging} -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -89,9 +88,9 @@ private[spark] abstract class YarnSchedulerBackend( private val minMergersStaticThreshold = conf.get(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) - private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + private val maxNumExecutors = conf.get(config.DYN_ALLOCATION_MAX_EXECUTORS) - private val numExecutors = conf.get(EXECUTOR_INSTANCES).getOrElse(0) + private val numExecutors = conf.get(config.EXECUTOR_INSTANCES).getOrElse(0) /** * Bind to YARN. This *must* be done before calling [[start()]]. @@ -194,7 +193,7 @@ private[spark] abstract class YarnSchedulerBackend( // Request for numMergersDesired shuffle mergers to BlockManagerMasterEndpoint // and if its less than minMergersNeeded, we disable push based shuffle. val mergerLocations = blockManagerMaster - .getShufflePushMergerLocations(numMergersDesired, scheduler.nodeBlacklist()) + .getShufflePushMergerLocations(numMergersDesired, scheduler.excludedNodes()) logDebug(s"Num merger locations available ${mergerLocations.length}") if (mergerLocations.size < numMergersDesired && mergerLocations.size < minMergersNeeded) { Seq.empty[BlockManagerId] From 2b0c073cc1b596d9404e524ecba7eb4bda72497d Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 12 Nov 2020 15:58:59 -0800 Subject: [PATCH 14/21] pick hosts in random order and remove host from shuffle push merger if a fetch failed exception happens on that host --- .../spark/scheduler/TaskSetManager.scala | 2 ++ .../spark/storage/BlockManagerMaster.scala | 9 +++++++++ .../storage/BlockManagerMasterEndpoint.scala | 18 ++++++++++++++---- .../spark/storage/BlockManagerMessages.scala | 2 ++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 0cfa76583bfbb..cee34d6f9f0fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -824,6 +824,8 @@ private[spark] class TaskSetManager( if (fetchFailed.bmAddress != null) { healthTracker.foreach(_.updateExcludedForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) + // Remove fetchFailed host in the shuffle push merger list for push based shuffle + env.blockManager.master.removeShufflePushMergerLocation(fetchFailed.bmAddress.host) } None diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 812e241f968de..fe1a5aef9499c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -136,6 +136,15 @@ class BlockManagerMaster( GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter)) } + /** + * Remove the host from the candidate list of shuffle push mergers. This can be + * triggered if there is a FetchFailedException on the host + * @param host + */ + def removeShufflePushMergerLocation(host: String): Unit = { + driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host)) + } + def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b31ed437a59ab..311a9f23b4587 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -152,6 +152,9 @@ class BlockManagerMasterEndpoint( case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) => context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter)) + case RemoveShufflePushMergerLocation(host) => + context.reply(removeShufflePushMergerLocation(host)) + case IsExecutorAlive(executorId) => context.reply(blockManagerIdByExecutor.contains(executorId)) @@ -701,12 +704,19 @@ class BlockManagerMasterEndpoint( } else { // Delta mergers added from inactive mergers list to the active mergers list val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) - val filteredMergersWithoutExecutors = shuffleMergerLocations.values + // Pick random hosts instead of preferring the top of the list + val randomizedShuffleMergerLocations = Utils.randomize(shuffleMergerLocations.values.toSeq) + val filteredMergersWithoutExecutors = randomizedShuffleMergerLocations .filterNot(x => hostsToFilter.contains(x.host)) .filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host)) - filteredMergersWithExecutors.toSeq ++ - filteredMergersWithoutExecutors.toSeq - .take(numMergersNeeded - filteredMergersWithExecutors.size) + filteredMergersWithExecutors.toSeq ++ filteredMergersWithoutExecutors + .take(numMergersNeeded - filteredMergersWithExecutors.size) + } + } + + private def removeShufflePushMergerLocation(host: String): Unit = { + if (shuffleMergerLocations.contains(host)) { + shuffleMergerLocations.remove(host) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 098c98d9f6dd2..afe416a55ed0d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -145,4 +145,6 @@ private[spark] object BlockManagerMessages { case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String]) extends ToBlockManagerMaster + case class RemoveShufflePushMergerLocation(host: String) extends ToBlockManagerMaster + } From 9ba4dfbed66fab7badf279ce3e83d4db569d61e2 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Fri, 13 Nov 2020 10:52:53 -0800 Subject: [PATCH 15/21] Addressed ngone51 comments --- .../apache/spark/internal/config/package.scala | 15 ++++++++------- .../org/apache/spark/scheduler/DAGScheduler.scala | 14 +++++++++----- .../apache/spark/scheduler/TaskSetManager.scala | 2 -- .../main/scala/org/apache/spark/util/Utils.scala | 3 +-- .../apache/spark/storage/BlockManagerSuite.scala | 4 +--- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 59ee65a7716f0..3821be163b258 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1953,8 +1953,9 @@ package object config { private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = ConfigBuilder("spark.shuffle.push.retainedMergerLocations") .doc("Maximum number of shuffle push mergers locations cached for push based shuffle. " + - "Currently shuffle push merger locations are nothing but shuffle services where an " + - "executor is launched in the case of Push based shuffle.") + "Currently, shuffle push merger locations are nothing but external shuffle services " + + "which are responsible for handling pushed blocks and merging them and serving " + + "merged blocks for later shuffle fetch.") .version("3.1.0") .intConf .createWithDefault(500) @@ -1973,14 +1974,14 @@ package object config { private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") .doc(s"The static threshold for number of shuffle push mergers locations should be " + - " available in order to enable push based shuffle for a stage. Note this config " + - s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. Maximum " + - "of spark.shuffle.push.mergersMinStaticThreshold and " + + "available in order to enable push based shuffle for a stage. Note this config " + + s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " + + s"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " + "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " + s"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + - s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need" + - s" atleast 50 mergers to enable push based shuffle for that stage.") + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " + + s"atleast 50 mergers to enable push based shuffle for that stage.") .version("3.1.0") .doubleConf .createWithDefault(5) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fe3bf177de2c6..ca1db32b52f3f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1270,14 +1270,13 @@ private[spark] class DAGScheduler( if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) - logInfo("Shuffle merge enabled for %s (%s) with %d merger locations" - .format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) + logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" + + s" ${stage.shuffleDep.getMergerLocs.size} merger locations") - logDebug(s"List of shuffle push merger locations " + + logDebug("List of shuffle push merger locations " + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } else { - logInfo(("No available merger locations." + - " Shuffle merge disabled for %s (%s)").format(stage, stage.name)) + logInfo(s"No available merger locations. Shuffle merge disabled for $stage (${stage.name})") } } @@ -1889,6 +1888,11 @@ private[spark] class DAGScheduler( // host, including from those that we still haven't confirmed as lost due to heartbeat // delays. ignoreShuffleFileLostEpoch = isHostDecommissioned) + + if (pushBasedShuffleEnabled) { + // Remove fetchFailed host in the shuffle push merger list for push based shuffle + env.blockManager.master.removeShufflePushMergerLocation(bmAddress.host) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index cee34d6f9f0fd..0cfa76583bfbb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -824,8 +824,6 @@ private[spark] class TaskSetManager( if (fetchFailed.bmAddress != null) { healthTracker.foreach(_.updateExcludedForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) - // Remove fetchFailed host in the shuffle push merger list for push based shuffle - env.blockManager.master.removeShufflePushMergerLocation(fetchFailed.bmAddress.host) } None diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 31970ee989543..27c7df58bd4cd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2547,8 +2547,7 @@ private[spark] object Utils extends Logging { * at the same time. Will improve this in a later version. */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - val pushBasedEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED) - pushBasedEnabled && + conf.get(PUSH_BASED_SHUFFLE_ENABLED) && (conf.get(IS_TESTING).getOrElse(false) || conf.get(SHUFFLE_SERVICE_ENABLED)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0b3d245a49a4d..ca7c8f86fdbdf 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -2040,7 +2040,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE class MockBlockTransferService( val maxFailures: Int, - hostname: String = "MockBlockTransferServiceHost") extends BlockTransferService { + override val hostName: String = "MockBlockTransferServiceHost") extends BlockTransferService { var numCalls = 0 var tempFileManager: DownloadFileManager = null @@ -2058,8 +2058,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def close(): Unit = {} - override def hostName: String = { hostname } - override def port: Int = { 63332 } override def uploadBlock( From 5127d8b2346fed966d4d5e0b4bb52aeaf620c557 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Sun, 15 Nov 2020 12:32:53 -0800 Subject: [PATCH 16/21] Address review comments --- .../org/apache/spark/internal/config/package.scala | 2 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../spark/storage/BlockManagerMasterEndpoint.scala | 13 +++++++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3821be163b258..2ecfc952475b1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1951,7 +1951,7 @@ package object config { .createWithDefault(false) private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = - ConfigBuilder("spark.shuffle.push.retainedMergerLocations") + ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations") .doc("Maximum number of shuffle push mergers locations cached for push based shuffle. " + "Currently, shuffle push merger locations are nothing but external shuffle services " + "which are responsible for handling pushed blocks and merging them and serving " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ca1db32b52f3f..128712ae273f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1270,13 +1270,14 @@ private[spark] class DAGScheduler( if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) - logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" + + logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" + s" ${stage.shuffleDep.getMergerLocs.size} merger locations") logDebug("List of shuffle push merger locations " + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } else { - logInfo(s"No available merger locations. Shuffle merge disabled for $stage (${stage.name})") + logInfo(s"No available merger locations." + + s" Push-based shuffle disabled for $stage (${stage.name})") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 311a9f23b4587..c12378d62d78c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -704,12 +704,17 @@ class BlockManagerMasterEndpoint( } else { // Delta mergers added from inactive mergers list to the active mergers list val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) - // Pick random hosts instead of preferring the top of the list - val randomizedShuffleMergerLocations = Utils.randomize(shuffleMergerLocations.values.toSeq) - val filteredMergersWithoutExecutors = randomizedShuffleMergerLocations + val filteredMergersWithoutExecutors = shuffleMergerLocations.values .filterNot(x => hostsToFilter.contains(x.host)) .filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host)) - filteredMergersWithExecutors.toSeq ++ filteredMergersWithoutExecutors + val randomFilteredMergersLocations = + if (filteredMergersWithoutExecutors.size > + numMergersNeeded - filteredMergersWithExecutors.size) { + Utils.randomize(filteredMergersWithoutExecutors) + } else { + filteredMergersWithoutExecutors + } + filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations .take(numMergersNeeded - filteredMergersWithExecutors.size) } } From e320ac05810ddf3ea7a322d83964a3b17879a495 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 16 Nov 2020 16:07:35 -0800 Subject: [PATCH 17/21] Address attila comments --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 +++++----- .../org/apache/spark/storage/BlockManagerId.scala | 2 ++ .../spark/storage/BlockManagerMasterEndpoint.scala | 7 ++++--- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 128712ae273f2..389f1d54c3b97 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1889,11 +1889,6 @@ private[spark] class DAGScheduler( // host, including from those that we still haven't confirmed as lost due to heartbeat // delays. ignoreShuffleFileLostEpoch = isHostDecommissioned) - - if (pushBasedShuffleEnabled) { - // Remove fetchFailed host in the shuffle push merger list for push based shuffle - env.blockManager.master.removeShufflePushMergerLocation(bmAddress.host) - } } } @@ -2067,6 +2062,11 @@ private[spark] class DAGScheduler( if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { executorFailureEpoch(execId) = currentEpoch logInfo(s"Executor lost: $execId (epoch $currentEpoch)") + if (pushBasedShuffleEnabled) { + // Remove fetchFailed host in the shuffle push merger list for push based shuffle + hostToUnregisterOutputs.foreach( + host => blockManagerMaster.removeShufflePushMergerLocation(host)) + } blockManagerMaster.removeExecutor(execId) clearCacheLocs() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 49e32d04d450a..c6a4457d8f910 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -145,4 +145,6 @@ private[spark] object BlockManagerId { def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { blockManagerIdCache.get(id) } + + private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger" } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index c12378d62d78c..88fdadfc3355c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -378,8 +378,8 @@ class BlockManagerMasterEndpoint( private def addMergerLocation(blockManagerId: BlockManagerId): Unit = { if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) { - val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host, - StorageUtils.externalShuffleServicePort(conf)) + val shuffleServerId = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, + blockManagerId.host, externalShuffleServicePort) if (shuffleMergerLocations.size >= maxRetainedMergerLocations) { shuffleMergerLocations -= shuffleMergerLocations.head._1 } @@ -696,7 +696,8 @@ class BlockManagerMasterEndpoint( val filteredBlockManagersWithExecutors = blockManagersWithExecutors .filterNot(x => hostsToFilter.contains(x.host)) val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( - x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) + x => BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, + x.host, externalShuffleServicePort)) // Enough mergers are available as part of active executors list if (filteredMergersWithExecutors.size >= numMergersNeeded) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 04f6506a19f41..932e3ce914f97 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -194,10 +194,10 @@ private[spark] abstract class YarnSchedulerBackend( // and if its less than minMergersNeeded, we disable push based shuffle. val mergerLocations = blockManagerMaster .getShufflePushMergerLocations(numMergersDesired, scheduler.excludedNodes()) - logDebug(s"Num merger locations available ${mergerLocations.length}") if (mergerLocations.size < numMergersDesired && mergerLocations.size < minMergersNeeded) { Seq.empty[BlockManagerId] } else { + logDebug(s"Num merger locations available ${mergerLocations.length}") mergerLocations } } From a2d85efd2365a61295057f641b95481135829fa3 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 16 Nov 2020 19:53:34 -0800 Subject: [PATCH 18/21] Addressed ngone51 review comment --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 88fdadfc3355c..55f8ed482fbea 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -712,11 +712,11 @@ class BlockManagerMasterEndpoint( if (filteredMergersWithoutExecutors.size > numMergersNeeded - filteredMergersWithExecutors.size) { Utils.randomize(filteredMergersWithoutExecutors) + .take(numMergersNeeded - filteredMergersWithExecutors.size) } else { filteredMergersWithoutExecutors } filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations - .take(numMergersNeeded - filteredMergersWithExecutors.size) } } From affa8a0089f093a7889321a897180b41a4c714ba Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 16 Nov 2020 20:21:50 -0800 Subject: [PATCH 19/21] Address attilapiros review comments --- .../spark/storage/BlockManagerMasterEndpoint.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 55f8ed482fbea..cb9daa4933c60 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -691,14 +691,10 @@ class BlockManagerMasterEndpoint( private def getShufflePushMergerLocations( numMergersNeeded: Int, hostsToFilter: Set[String]): Seq[BlockManagerId] = { - val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) - .mapValues(_.head).values.map(_._2).toSet - val filteredBlockManagersWithExecutors = blockManagersWithExecutors - .filterNot(x => hostsToFilter.contains(x.host)) - val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( - x => BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, - x.host, externalShuffleServicePort)) - + val blockManagerHosts = blockManagerIdByExecutor.values.map(_.host).toSet + val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_)) + val filteredMergersWithExecutors = filteredBlockManagerHosts.map( + BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort)) // Enough mergers are available as part of active executors list if (filteredMergersWithExecutors.size >= numMergersNeeded) { filteredMergersWithExecutors.toSeq From 46f5670979b2244863a10ea4ab9664fd0ad89425 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 17 Nov 2020 21:26:16 -0800 Subject: [PATCH 20/21] Add test in UtilsSuite to check push based shuffle is enabled or not --- .../scala/org/apache/spark/util/UtilsSuite.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 7ec7c5afca1df..ccd13331b8f62 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.util -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, - FileOutputStream, InputStream, PrintStream, SequenceInputStream} -import java.lang.{Double => JDouble, Float => JFloat} +import java.io._ import java.lang.reflect.Field import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} @@ -42,6 +40,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.SparkListener import org.apache.spark.util.io.ChunkedByteBufferInputStream @@ -1406,6 +1405,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(hostnamePort._1.equals("localhost")) assert(hostnamePort._2 === 0) } + + test("Test Push-based shuffle is enabled only when both" + + " spark.shuffle.service.enabled and spark.shuffle.push.enabled are enabled") { + val conf = new SparkConf() + assert(Utils.isPushBasedShuffleEnabled(conf) === false) + conf.set(PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set(IS_TESTING, false) + assert(Utils.isPushBasedShuffleEnabled(conf) === false) + conf.set(SHUFFLE_SERVICE_ENABLED, true) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) + } } private class SimpleExtension From 050a5ae31dfe2ee3a698cb68ba5227f5d42564a1 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 17 Nov 2020 21:44:51 -0800 Subject: [PATCH 21/21] Address Dongjoon comments --- .../org/apache/spark/internal/config/package.scala | 14 +++++++------- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../spark/storage/BlockManagerMasterEndpoint.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 2 -- .../apache/spark/storage/BlockManagerSuite.scala | 4 ++-- .../scheduler/cluster/YarnSchedulerBackend.scala | 9 +++++---- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 57bd73ef9a7f2..c12f3db6e5ffd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1941,7 +1941,7 @@ package object config { private[spark] val PUSH_BASED_SHUFFLE_ENABLED = ConfigBuilder("spark.shuffle.push.enabled") - .doc("Set to 'true' to enable push based shuffle on the client side and this works in " + + .doc("Set to 'true' to enable push-based shuffle on the client side and this works in " + "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " + "which needs to be set with the appropriate " + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " + @@ -1950,9 +1950,9 @@ package object config { .booleanConf .createWithDefault(false) - private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = + private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS = ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations") - .doc("Maximum number of shuffle push mergers locations cached for push based shuffle. " + + .doc("Maximum number of shuffle push merger locations cached for push based shuffle. " + "Currently, shuffle push merger locations are nothing but external shuffle services " + "which are responsible for handling pushed blocks and merging them and serving " + "merged blocks for later shuffle fetch.") @@ -1973,15 +1973,15 @@ package object config { private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") - .doc(s"The static threshold for number of shuffle push mergers locations should be " + + .doc(s"The static threshold for number of shuffle push merger locations should be " + "available in order to enable push based shuffle for a stage. Note this config " + s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " + - s"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " + + "Maximum of spark.shuffle.push.mergersMinStaticThreshold and " + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " + "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " + - s"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + + "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " + - s"atleast 50 mergers to enable push based shuffle for that stage.") + "at least 50 mergers to enable push based shuffle for that stage.") .version("3.1.0") .doubleConf .createWithDefault(5) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 389f1d54c3b97..6fb0fb93f253b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1263,7 +1263,7 @@ private[spark] class DAGScheduler( * locations for block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { - // TODO SPARK-32920: Handle stage reuse/retry cases separately as without finalize + // TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize // TODO changes we cannot disable shuffle merge for the retry/reuse cases val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) @@ -1276,7 +1276,7 @@ private[spark] class DAGScheduler( logDebug("List of shuffle push merger locations " + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } else { - logInfo(s"No available merger locations." + + logInfo("No available merger locations." + s" Push-based shuffle disabled for $stage (${stage.name})") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index cb9daa4933c60..70810833251e3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -80,7 +80,7 @@ class BlockManagerMasterEndpoint( private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]() // Maximum number of merger locations to cache - private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS) + private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS) private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index cbf710afa631f..6ccf65b737c1a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2543,8 +2543,6 @@ private[spark] object Utils extends Logging { /** * Push based shuffle can only be enabled when external shuffle service is enabled. - * In the initial version, we cannot support pushed based shuffle and adaptive execution - * at the same time. Will improve this in a later version. */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { conf.get(PUSH_BASED_SHUFFLE_ENABLED) && diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ca7c8f86fdbdf..144489c5f7922 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1975,7 +1975,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - test("Shuffle push merger locations should be bounded with in" + + test("SPARK-32919: Shuffle push merger locations should be bounded with in" + " spark.shuffle.push.retainedMergerLocations") { assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty) makeBlockManager(100, "execA", @@ -1994,7 +1994,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3) } - test("Prefer active executors locations for shuffle push mergers") { + test("SPARK-32919: Prefer active executor locations for shuffle push mergers") { makeBlockManager(100, "execA", transferService = Some(new MockBlockTransferService(10, "hostA"))) makeBlockManager(100, "execB", diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 932e3ce914f97..22002bb32004d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -176,8 +176,8 @@ private[spark] abstract class YarnSchedulerBackend( override def getShufflePushMergerLocations( numPartitions: Int, resourceProfileId: Int): Seq[BlockManagerId] = { - // Currently this is naive way of calculating numMergersDesired for a stage. In future, - // we can use better heuristics to calculate numMergersDesired for a stage. + // TODO (SPARK-33481) This is a naive way of calculating numMergersDesired for a stage, + // TODO we can use better heuristics to calculate numMergersDesired for a stage. val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { maxNumExecutors } else { @@ -191,13 +191,14 @@ private[spark] abstract class YarnSchedulerBackend( math.floor(numMergersDesired * minMergersThresholdRatio).toInt) // Request for numMergersDesired shuffle mergers to BlockManagerMasterEndpoint - // and if its less than minMergersNeeded, we disable push based shuffle. + // and if it's less than minMergersNeeded, we disable push based shuffle. val mergerLocations = blockManagerMaster .getShufflePushMergerLocations(numMergersDesired, scheduler.excludedNodes()) if (mergerLocations.size < numMergersDesired && mergerLocations.size < minMergersNeeded) { Seq.empty[BlockManagerId] } else { - logDebug(s"Num merger locations available ${mergerLocations.length}") + logDebug(s"The number of shuffle mergers desired ${numMergersDesired}" + + s" and available locations are ${mergerLocations.length}") mergerLocations } }