From 934727b9267baefc7048765d355d0006fe7bb820 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 22 Sep 2015 14:20:29 -0700 Subject: [PATCH 1/2] Support multiple roles with mesos cluster mode. --- .../cluster/mesos/MesosClusterScheduler.scala | 57 ++++++++++--------- .../cluster/mesos/MesosSchedulerUtils.scala | 4 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 8cda4ff0eb3b..e60a93253697 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -442,9 +442,13 @@ private[spark] class MesosClusterScheduler( options } - private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) { + private class ResourceOffer( + val offerId: OfferID, + val slaveId: SlaveID, + var resources: JList[Resource], + var used: Boolean) { override def toString(): String = { - s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem" + s"Offer id: ${offerId}, resources: ${resources}" } } @@ -456,34 +460,37 @@ private[spark] class MesosClusterScheduler( private def scheduleTasks( candidates: Seq[MesosDriverDescription], afterLaunchCallback: (String) => Boolean, - currentOffers: List[ResourceOffer], - tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { + currentOffers: JList[ResourceOffer], + tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): JList[ResourceOffer] = { for (submission <- candidates) { val driverCpu = submission.cores val driverMem = submission.mem logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") - val offerOption = currentOffers.find { o => - o.cpu >= driverCpu && o.mem >= driverMem + val offerOption = currentOffers.asScala.find { o => + getResource(o.resources, "cpus") >= driverCpu && + getResource(o.resources, "mem") >= driverMem } if (offerOption.isEmpty) { logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + s"cpu: $driverCpu, mem: $driverMem") } else { val offer = offerOption.get - offer.cpu -= driverCpu - offer.mem -= driverMem + offer.used = true val taskId = TaskID.newBuilder().setValue(submission.submissionId).build() - val cpuResource = createResource("cpus", driverCpu) - val memResource = createResource("mem", driverMem) + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.resources, "cpus", driverCpu) + val (finalResources, memResourcesToUse) = + partitionResources(remainingResources, "mem", driverMem) val commandInfo = buildDriverCommand(submission) val appName = submission.schedulerProperties("spark.app.name") val taskInfo = TaskInfo.newBuilder() .setTaskId(taskId) .setName(s"Driver for $appName") - .setSlaveId(offer.offer.getSlaveId) + .setSlaveId(offer.slaveId) .setCommand(commandInfo) - .addResources(cpuResource) - .addResources(memResource) + .addAllResources(cpuResourcesToUse) + .addAllResources(memResourcesToUse) + offer.resources = finalResources submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => val container = taskInfo.getContainerBuilder() val volumes = submission.schedulerProperties @@ -496,28 +503,28 @@ private[spark] class MesosClusterScheduler( container, image, volumes = volumes, portmaps = portmaps) taskInfo.setContainer(container.build()) } - val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo]) + val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) queuedTasks += taskInfo.build() - logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, + val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId, None, new Date(), None) launchedDrivers(submission.submissionId) = newState launchedDriversState.persist(submission.submissionId, newState) afterLaunchCallback(submission.submissionId) } } + currentOffers } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - val currentOffers = offers.asScala.map(o => - new ResourceOffer( - o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem")) - ).toList - logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}") + logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}") val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val currentTime = new Date() + var currentOffers = offers.asScala.map { + o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, false) + }.toList.asJava stateLock.synchronized { // We first schedule all the supervised drivers that are ready to retry. // This list will be empty if none of the drivers are marked as supervise. @@ -525,14 +532,14 @@ private[spark] class MesosClusterScheduler( d.retryState.get.nextRetry.before(currentTime) } - scheduleTasks( + currentOffers = scheduleTasks( copyBuffer(driversToRetry), removeFromPendingRetryDrivers, currentOffers, tasks) // Then we walk through the queued drivers and try to schedule them. - scheduleTasks( + currentOffers = scheduleTasks( copyBuffer(queuedDrivers), removeFromQueuedDrivers, currentOffers, @@ -541,9 +548,7 @@ private[spark] class MesosClusterScheduler( tasks.foreach { case (offerId, taskInfos) => driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) } - offers.asScala - .filter(o => !tasks.keySet.contains(o.getId)) - .foreach(o => driver.declineOffer(o.getId)) + currentOffers.asScala.filter(!_.used).foreach(o => driver.declineOffer(o.offerId)) } private def copyBuffer( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index a98f2f1fe5da..6ca2a3035828 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -179,7 +179,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { def partitionResources( resources: JList[Resource], resourceName: String, - amountToUse: Double): (List[Resource], List[Resource]) = { + amountToUse: Double): (JList[Resource], JList[Resource]) = { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { @@ -202,7 +202,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { val filteredResources = remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) - (filteredResources.toList, requestedResources.toList) + (filteredResources.toList.asJava, requestedResources.toList.asJava) } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ From ebadaf3c55e839a00a8cf534e7b2fd2c8e988475 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Wed, 14 Oct 2015 09:11:09 +0800 Subject: [PATCH 2/2] Add unit test. --- .../cluster/mesos/MesosClusterScheduler.scala | 42 +++--- .../cluster/mesos/MesosSchedulerUtils.scala | 4 +- .../mesos/MesosClusterSchedulerSuite.scala | 139 ++++++++++++++++++ .../mesos/MesosClusterSchedulerSuite.scala | 74 ---------- 4 files changed, 163 insertions(+), 96 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index e60a93253697..2df7b1120b1c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler( val appJar = CommandInfo.URI.newBuilder() .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build() val builder = CommandInfo.newBuilder().addUris(appJar) - val entries = - (conf.getOption("spark.executor.extraLibraryPath").toList ++ - desc.command.libraryPathEntries) + val entries = conf.getOption("spark.executor.extraLibraryPath") + .map(path => Seq(path) ++ desc.command.libraryPathEntries) + .getOrElse(desc.command.libraryPathEntries) + val prefixEnv = if (!entries.isEmpty) { Utils.libraryPathEnvPrefix(entries) } else { @@ -445,8 +446,7 @@ private[spark] class MesosClusterScheduler( private class ResourceOffer( val offerId: OfferID, val slaveId: SlaveID, - var resources: JList[Resource], - var used: Boolean) { + var resources: JList[Resource]) { override def toString(): String = { s"Offer id: ${offerId}, resources: ${resources}" } @@ -460,13 +460,13 @@ private[spark] class MesosClusterScheduler( private def scheduleTasks( candidates: Seq[MesosDriverDescription], afterLaunchCallback: (String) => Boolean, - currentOffers: JList[ResourceOffer], - tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): JList[ResourceOffer] = { + currentOffers: List[ResourceOffer], + tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { for (submission <- candidates) { val driverCpu = submission.cores val driverMem = submission.mem logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") - val offerOption = currentOffers.asScala.find { o => + val offerOption = currentOffers.find { o => getResource(o.resources, "cpus") >= driverCpu && getResource(o.resources, "mem") >= driverMem } @@ -475,12 +475,11 @@ private[spark] class MesosClusterScheduler( s"cpu: $driverCpu, mem: $driverMem") } else { val offer = offerOption.get - offer.used = true val taskId = TaskID.newBuilder().setValue(submission.submissionId).build() val (remainingResources, cpuResourcesToUse) = partitionResources(offer.resources, "cpus", driverCpu) val (finalResources, memResourcesToUse) = - partitionResources(remainingResources, "mem", driverMem) + partitionResources(remainingResources.asJava, "mem", driverMem) val commandInfo = buildDriverCommand(submission) val appName = submission.schedulerProperties("spark.app.name") val taskInfo = TaskInfo.newBuilder() @@ -488,9 +487,9 @@ private[spark] class MesosClusterScheduler( .setName(s"Driver for $appName") .setSlaveId(offer.slaveId) .setCommand(commandInfo) - .addAllResources(cpuResourcesToUse) - .addAllResources(memResourcesToUse) - offer.resources = finalResources + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) + offer.resources = finalResources.asJava submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => val container = taskInfo.getContainerBuilder() val volumes = submission.schedulerProperties @@ -514,7 +513,6 @@ private[spark] class MesosClusterScheduler( afterLaunchCallback(submission.submissionId) } } - currentOffers } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { @@ -522,9 +520,10 @@ private[spark] class MesosClusterScheduler( val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val currentTime = new Date() - var currentOffers = offers.asScala.map { - o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, false) - }.toList.asJava + val currentOffers = offers.asScala.map { + o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList) + }.toList + stateLock.synchronized { // We first schedule all the supervised drivers that are ready to retry. // This list will be empty if none of the drivers are marked as supervise. @@ -532,14 +531,14 @@ private[spark] class MesosClusterScheduler( d.retryState.get.nextRetry.before(currentTime) } - currentOffers = scheduleTasks( + scheduleTasks( copyBuffer(driversToRetry), removeFromPendingRetryDrivers, currentOffers, tasks) // Then we walk through the queued drivers and try to schedule them. - currentOffers = scheduleTasks( + scheduleTasks( copyBuffer(queuedDrivers), removeFromQueuedDrivers, currentOffers, @@ -548,7 +547,10 @@ private[spark] class MesosClusterScheduler( tasks.foreach { case (offerId, taskInfos) => driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) } - currentOffers.asScala.filter(!_.used).foreach(o => driver.declineOffer(o.offerId)) + + for (o <- currentOffers if !tasks.contains(o.offerId)) { + driver.declineOffer(o.offerId) + } } private def copyBuffer( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 6ca2a3035828..a98f2f1fe5da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -179,7 +179,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { def partitionResources( resources: JList[Resource], resourceName: String, - amountToUse: Double): (JList[Resource], JList[Resource]) = { + amountToUse: Double): (List[Resource], List[Resource]) = { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { @@ -202,7 +202,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { val filteredResources = remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) - (filteredResources.toList.asJava, requestedResources.toList.asJava) + (filteredResources.toList, requestedResources.toList) } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala new file mode 100644 index 000000000000..dbef6868f20e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util.{Collection, Collections, Date} + +import scala.collection.JavaConverters._ + +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.{Scalar, Type} +import org.apache.mesos.SchedulerDriver +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription + + +class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq()) + private var scheduler: MesosClusterScheduler = _ + + override def beforeEach(): Unit = { + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { ready = true } + } + scheduler.start() + } + + test("can queue drivers") { + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1000, 1, true, + command, Map[String, String](), "s1", new Date())) + assert(response.success) + val response2 = + scheduler.submitDriver(new MesosDriverDescription( + "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date())) + assert(response2.success) + val state = scheduler.getSchedulerState() + val queuedDrivers = state.queuedDrivers.toList + assert(queuedDrivers(0).submissionId == response.submissionId) + assert(queuedDrivers(1).submissionId == response2.submissionId) + } + + test("can kill queued drivers") { + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1000, 1, true, + command, Map[String, String](), "s1", new Date())) + assert(response.success) + val killResponse = scheduler.killDriver(response.submissionId) + assert(killResponse.success) + val state = scheduler.getSchedulerState() + assert(state.queuedDrivers.isEmpty) + } + + test("can handle multiple roles") { + val driver = mock[SchedulerDriver] + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1200, 1.5, true, + command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), + "s1", + new Date())) + assert(response.success) + val offer = Offer.newBuilder() + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1000).build()) + .setName("mem") + .setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("role2") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("role2") + .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR)) + .setId(OfferID.newBuilder().setValue("o1").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) + .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) + .setHostname("host1") + .build() + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(offer.getId)), + capture.capture()) + ).thenReturn(Status.valueOf(1)) + + scheduler.resourceOffers(driver, Collections.singletonList(offer)) + + val taskInfos = capture.getValue + assert(taskInfos.size() == 1) + val taskInfo = taskInfos.iterator().next() + val resources = taskInfo.getResourcesList + assert(scheduler.getResource(resources, "cpus") == 1.5) + assert(scheduler.getResource(resources, "mem") == 1200) + val resourcesSeq: Seq[Resource] = resources.asScala + val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList + assert(cpus.size == 2) + assert(cpus.exists(_.getRole().equals("role2"))) + assert(cpus.exists(_.getRole().equals("*"))) + val mem = resourcesSeq.filter(_.getName.equals("mem")).toList + assert(mem.size == 2) + assert(mem.exists(_.getRole().equals("role2"))) + assert(mem.exists(_.getRole().equals("*"))) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(offer.getId)), + capture.capture() + ) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala deleted file mode 100644 index 98fdc58786ec..000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosClusterSchedulerSuite.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.mesos - -import java.util.Date - -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos._ - -class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { - - private val command = new Command("mainClass", Seq("arg"), null, null, null, null) - - test("can queue drivers") { - val conf = new SparkConf() - conf.setMaster("mesos://localhost:5050") - conf.setAppName("spark mesos") - val scheduler = new MesosClusterScheduler( - new BlackHoleMesosClusterPersistenceEngineFactory, conf) { - override def start(): Unit = { ready = true } - } - scheduler.start() - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1000, 1, true, - command, Map[String, String](), "s1", new Date())) - assert(response.success) - val response2 = - scheduler.submitDriver(new MesosDriverDescription( - "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date())) - assert(response2.success) - val state = scheduler.getSchedulerState() - val queuedDrivers = state.queuedDrivers.toList - assert(queuedDrivers(0).submissionId == response.submissionId) - assert(queuedDrivers(1).submissionId == response2.submissionId) - } - - test("can kill queued drivers") { - val conf = new SparkConf() - conf.setMaster("mesos://localhost:5050") - conf.setAppName("spark mesos") - val scheduler = new MesosClusterScheduler( - new BlackHoleMesosClusterPersistenceEngineFactory, conf) { - override def start(): Unit = { ready = true } - } - scheduler.start() - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1000, 1, true, - command, Map[String, String](), "s1", new Date())) - assert(response.success) - val killResponse = scheduler.killDriver(response.submissionId) - assert(killResponse.success) - val state = scheduler.getSchedulerState() - assert(state.queuedDrivers.isEmpty) - } -}