From 2fc02885f70c789d1c7912c618534a644ec73021 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 25 Sep 2019 13:37:12 -0700 Subject: [PATCH 1/2] alternative --- .../MesosCoarseGrainedSchedulerBackend.scala | 16 ++++++++-------- ...MesosCoarseGrainedSchedulerBackendSuite.scala | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5b5f449548988..4a14fbbb62eb1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient -import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef} +import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -375,9 +375,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( logDebug(s"Received ${offers.size} resource offers.") + val blacklist = scheduler.nodeBlacklist() val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => - val offerAttributes = toAttributeMap(offer.getAttributesList) - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + if (blacklist.contains(offer.getHostname)) { + false + } else { + val offerAttributes = toAttributeMap(offer.getAttributesList) + matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + } } declineUnmatchedOffers(d, unmatchedOffers) @@ -580,11 +585,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && - // nodeBlacklist() currently only gets updated based on failures in spark tasks. - // If a mesos task fails to even start -- that is, - // if a spark executor fails to launch on a node -- nodeBlacklist does not get updated - // see SPARK-24567 for details - !scheduler.nodeBlacklist().contains(offerHostname) && meetsPortRequirements && satisfiesLocality(offerHostname) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index aa8fd1c86aa80..d2e21b49caa95 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -126,7 +126,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // Offer resources from the same slave offerResources(List(offer2)) // but since it's blacklisted the offer is declined - verifyDeclinedOffer(driver, createOfferId("o1")) + verifyDeclinedOffer(driver, createOfferId("o1"), true) } test("mesos supports spark.executor.cores") { From 04f931d0348b4b6583d704a01b3421a4a8d91d3c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 25 Sep 2019 13:41:18 -0700 Subject: [PATCH 2/2] Recover comments --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 4a14fbbb62eb1..59c0fd81475a7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -375,6 +375,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( logDebug(s"Received ${offers.size} resource offers.") + // nodeBlacklist() currently only gets updated based on failures in spark tasks. + // If a mesos task fails to even start -- that is, + // if a spark executor fails to launch on a node -- nodeBlacklist does not get updated + // see SPARK-24567 for details val blacklist = scheduler.nodeBlacklist() val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => if (blacklist.contains(offer.getHostname)) {