diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5b5f449548988..59c0fd81475a7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient -import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef} +import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -375,9 +375,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( logDebug(s"Received ${offers.size} resource offers.") + // nodeBlacklist() currently only gets updated based on failures in spark tasks. + // If a mesos task fails to even start -- that is, + // if a spark executor fails to launch on a node -- nodeBlacklist does not get updated + // see SPARK-24567 for details + val blacklist = scheduler.nodeBlacklist() val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => - val offerAttributes = toAttributeMap(offer.getAttributesList) - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + if (blacklist.contains(offer.getHostname)) { + false + } else { + val offerAttributes = toAttributeMap(offer.getAttributesList) + matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } } declineUnmatchedOffers(d, unmatchedOffers) @@ -580,11 +589,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && - // nodeBlacklist() currently only gets updated based on failures in spark tasks. - // If a mesos task fails to even start -- that is, - // if a spark executor fails to launch on a node -- nodeBlacklist does not get updated - // see SPARK-24567 for details - !scheduler.nodeBlacklist().contains(offerHostname) && meetsPortRequirements && satisfiesLocality(offerHostname) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index aa8fd1c86aa80..d2e21b49caa95 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -126,7 +126,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // Offer resources from the same slave offerResources(List(offer2)) // but since it's blacklisted the offer is declined - verifyDeclinedOffer(driver, createOfferId("o1")) + verifyDeclinedOffer(driver, createOfferId("o1"), true) } test("mesos supports spark.executor.cores") {