Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler(
val appJar = CommandInfo.URI.newBuilder()
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
val builder = CommandInfo.newBuilder().addUris(appJar)
val entries =
(conf.getOption("spark.executor.extraLibraryPath").toList ++
desc.command.libraryPathEntries)
val entries = conf.getOption("spark.executor.extraLibraryPath")
.map(path => Seq(path) ++ desc.command.libraryPathEntries)
.getOrElse(desc.command.libraryPathEntries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems more complicated than the original version. Any particular reason why you refactored it this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was causing a null exception as the toList returns a Nil and with ++ .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnachen I don't understand, conf.getOption(...).toList returns Nil, but you can append to an empty list can't you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, what's the motivation for using java list instead? Now you have to convert to and from scala lists in several places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try again, but when I was running the unit tests it gives me a NPE for some reason, and been a while since I ran this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Nil ++. Regarding Java <-> Scala conversions, I think this change is a net positive: removed around 9 calls to asJava and introduced only 3 asScala. :)


val prefixEnv = if (!entries.isEmpty) {
Utils.libraryPathEnvPrefix(entries)
} else {
Expand Down Expand Up @@ -442,9 +443,12 @@ private[spark] class MesosClusterScheduler(
options
}

private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
private class ResourceOffer(
val offerId: OfferID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future I will keep minor refactors like this separate from this patch. It's distracting to the reviewer and is not needed to fix SPARK-10749. (for now I would just leave it here since you already did it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will remember this.

val slaveId: SlaveID,
var resources: JList[Resource]) {
override def toString(): String = {
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
s"Offer id: ${offerId}, resources: ${resources}"
}
}

Expand All @@ -463,27 +467,29 @@ private[spark] class MesosClusterScheduler(
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
o.cpu >= driverCpu && o.mem >= driverMem
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val cpuResource = createResource("cpus", driverCpu)
val memResource = createResource("mem", driverMem)
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", driverCpu)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
.setSlaveId(offer.offer.getSlaveId)
.setSlaveId(offer.slaveId)
.setCommand(commandInfo)
.addResources(cpuResource)
.addResources(memResource)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
Expand All @@ -496,11 +502,11 @@ private[spark] class MesosClusterScheduler(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
Expand All @@ -510,14 +516,14 @@ private[spark] class MesosClusterScheduler(
}

override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
val currentOffers = offers.asScala.map(o =>
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
).toList
logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()

val currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
}.toList

stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise.
Expand All @@ -541,9 +547,10 @@ private[spark] class MesosClusterScheduler(
tasks.foreach { case (offerId, taskInfos) =>
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
offers.asScala
.filter(o => !tasks.keySet.contains(o.getId))
.foreach(o => driver.declineOffer(o.getId))

for (o <- currentOffers if !tasks.contains(o.offerId)) {
driver.declineOffer(o.offerId)
}
}

private def copyBuffer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import java.util.{Collection, Collections, Date}

import scala.collection.JavaConverters._

import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar

import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription


class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {

private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
private var scheduler: MesosClusterScheduler = _

override def beforeEach(): Unit = {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
}
scheduler.start()
}

test("can queue drivers") {
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
assert(response.success)
val response2 =
scheduler.submitDriver(new MesosDriverDescription(
"d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
assert(response2.success)
val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response.submissionId)
assert(queuedDrivers(1).submissionId == response2.submissionId)
}

test("can kill queued drivers") {
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
assert(response.success)
val killResponse = scheduler.killDriver(response.submissionId)
assert(killResponse.success)
val state = scheduler.getSchedulerState()
assert(state.queuedDrivers.isEmpty)
}

test("can handle multiple roles") {
val driver = mock[SchedulerDriver]
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
"s1",
new Date()))
assert(response.success)
val offer = Offer.newBuilder()
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1000).build())
.setName("mem")
.setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("role2")
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("role2")
.setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
.setId(OfferID.newBuilder().setValue("o1").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
.setSlaveId(SlaveID.newBuilder().setValue("s1").build())
.setHostname("host1")
.build()

val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])

when(
driver.launchTasks(
Matchers.eq(Collections.singleton(offer.getId)),
capture.capture())
).thenReturn(Status.valueOf(1))

scheduler.resourceOffers(driver, Collections.singletonList(offer))

val taskInfos = capture.getValue
assert(taskInfos.size() == 1)
val taskInfo = taskInfos.iterator().next()
val resources = taskInfo.getResourcesList
assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala
val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
assert(cpus.size == 2)
assert(cpus.exists(_.getRole().equals("role2")))
assert(cpus.exists(_.getRole().equals("*")))
val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
assert(mem.size == 2)
assert(mem.exists(_.getRole().equals("role2")))
assert(mem.exists(_.getRole().equals("*")))

verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(offer.getId)),
capture.capture()
)
}
}

This file was deleted.