diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index a54bca800a007..59c0fd81475a7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient -import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef} +import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -61,9 +61,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { - // Blacklist a slave after this many failures - private val MAX_SLAVE_FAILURES = 2 - private val maxCoresOption = conf.get(config.CORES_MAX) private val executorCoresOption = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt) @@ -378,9 +375,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( logDebug(s"Received ${offers.size} resource offers.") + // nodeBlacklist() currently only gets updated based on failures in spark tasks. + // If a mesos task fails to even start -- that is, + // if a spark executor fails to launch on a node -- nodeBlacklist does not get updated + // see SPARK-24567 for details + val blacklist = scheduler.nodeBlacklist() val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => - val offerAttributes = toAttributeMap(offer.getAttributesList) - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + if (blacklist.contains(offer.getHostname)) { + false + } else { + val offerAttributes = toAttributeMap(offer.getAttributesList) + matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } } declineUnmatchedOffers(d, unmatchedOffers) @@ -583,7 +589,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements && satisfiesLocality(offerHostname) } @@ -659,14 +664,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalGpusAcquired -= gpus gpusByTaskId -= taskId } - // If it was a failure, mark the slave as failed for blacklisting purposes if (TaskState.isFailed(state)) { - slave.taskFailures += 1 - - if (slave.taskFailures >= MAX_SLAVE_FAILURES) { - logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + - "is Spark installed on it?") - } + logError(s"Mesos task $taskId failed on Mesos slave $slaveId.") } executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") // In case we'd rejected everything before but have now lost a node @@ -796,7 +795,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private class Slave(val hostname: String) { val taskIDs = new mutable.HashSet[String]() - var taskFailures = 0 var shuffleRegistered = false } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f810da17e6c44..d2e21b49caa95 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -107,6 +107,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o2") } + test("SPARK-19755 mesos declines offers from blacklisted slave") { + setBackend() + + // launches a task on a valid offer on slave s1 + val minMem = backend.executorMemory(sc) + 1024 + val minCpu = 4 + val offer1 = Resources(minMem, minCpu) + offerResources(List(offer1)) + verifyTaskLaunched(driver, "o1") + + // for any reason executor(aka mesos task) failed on s1 + val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) + backend.statusUpdate(driver, status) + when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1")) + + val offer2 = Resources(minMem, minCpu) + // Offer resources from the same slave + offerResources(List(offer2)) + // but since it's blacklisted the offer is declined + verifyDeclinedOffer(driver, createOfferId("o1"), true) + } + test("mesos supports spark.executor.cores") { val executorCores = 4 setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString))