diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8df37c247d0d4..a16e297c8d23e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -21,14 +21,15 @@ import java.nio.ByteBuffer import java.util.{TimerTask, Timer} import java.util.concurrent.atomic.AtomicLong -import scala.concurrent.duration._ import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet +import scala.collection.mutable.{HashMap, HashSet, Map} +import scala.concurrent.duration._ + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.util.collection.LRUMap /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -195,11 +196,13 @@ private[spark] class TaskSchedulerImpl( * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ - def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + def resourceOffers(execWorkerOffers: Map[String, WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) + val workerOffers = execWorkerOffers.values + // Mark each slave as alive and remember its hostname - for (o <- offers) { + for (o <- workerOffers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() @@ -208,8 +211,8 @@ private[spark] class TaskSchedulerImpl( } // Build a list of tasks to assign to each worker - val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) - val availableCpus = offers.map(o => o.cores).toArray + val tasks = workerOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)).toArray + val availableCpus = workerOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue() for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -222,16 +225,20 @@ private[spark] class TaskSchedulerImpl( for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { do { launchedTask = false - for (i <- 0 until offers.size) { - val execId = offers(i).executorId - val host = offers(i).host + for (i <- 0 until workerOffers.size) { + //will not trigger LRU moving + val candidateOffer = execWorkerOffers.head._2 + val execId = candidateOffer.executorId + val host = candidateOffer.host for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) { tasks(i) += task val tid = task.taskId + //trigger LRU + val selectedExecId = execWorkerOffers.get(execWorkerOffers.head._1).get.executorId taskIdToTaskSetId(tid) = taskSet.taskSet.id - taskIdToExecutorId(tid) = execId - activeExecutorIds += execId - executorsByHost(host) += execId + taskIdToExecutorId(tid) = selectedExecId + activeExecutorIds += selectedExecId + executorsByHost(host) += selectedExecId availableCpus(i) -= 1 launchedTask = true } @@ -458,4 +465,18 @@ private[spark] object TaskSchedulerImpl { retval.toList } + + + def buildMapFromWorkerOffers(scheduler: TaskSchedulerImpl, workerOffers: Seq[WorkerOffer]) = { + val map = { + if (scheduler.conf.getBoolean("spark.scheduler.lruOffer", false)) { + new LRUMap[String, WorkerOffer](1000) + } + else { + new HashMap[String, WorkerOffer] + } + } + for (workerOffer <- workerOffers) map += workerOffer.executorId -> workerOffer + map + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index ba6bab3f91a65..550c5215a5738 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,4 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val executorId: String, val host: String, val cores: Int) +class WorkerOffer(val executorId: String, val host: String, var cores: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 379e02eb9a437..f264a94e69743 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -31,6 +31,7 @@ import org.apache.spark.{Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.collection.LRUMap /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -50,12 +51,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A private val timeout = AkkaUtils.askTimeout(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { + + private val schedulingMode = conf.getBoolean("spark.scheduler.lruOffer", false) + private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] - private val freeCores = new HashMap[String, Int] + private val executorWorkerOffer = { + if (schedulingMode) { + new LRUMap[String, WorkerOffer](1000) + } + else { + new HashMap[String, WorkerOffer] + } + } private val addressToExecutorId = new HashMap[Address, String] + override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -76,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - freeCores(executorId) = cores + executorWorkerOffer(executorId) = new WorkerOffer(executorId, executorHost(executorId), cores) executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) @@ -87,7 +99,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { - freeCores(executorId) += 1 + executorWorkerOffer(executorId).cores += 1 makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. @@ -124,34 +136,33 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } // Make fake resource offers on all executors - def makeOffers() { - launchTasks(scheduler.resourceOffers( - executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + private[this] def makeOffers() { + launchTasks(scheduler.resourceOffers(executorWorkerOffer)) } // Make fake resource offers on just one executor - def makeOffers(executorId: String) { + private[this] def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, Seq(executorWorkerOffer(executorId))))) } // Launch tasks returned by a set of resource offers - def launchTasks(tasks: Seq[Seq[TaskDescription]]) { + private[this] def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= 1 + executorWorkerOffer(task.executorId).cores -= 1 executorActor(task.executorId) ! LaunchTask(task) } } // Remove a disconnected slave from the cluster - def removeExecutor(executorId: String, reason: String) { + private[this] def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = freeCores(executorId) + val numCores = executorWorkerOffer(executorId).cores addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId - freeCores -= executorId + executorWorkerOffer -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } @@ -173,7 +184,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - def stopExecutors() { + private[this] def stopExecutors() { try { if (driverActor != null) { logInfo("Shutting down all executors") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index bcf0ce19a54cd..505851945f90d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -202,10 +202,9 @@ private[spark] class MesosSchedulerBackend( offer.getHostname, getResource(offer.getResourcesList, "cpus").toInt) } - - // Call into the TaskSchedulerImpl - val taskLists = scheduler.resourceOffers(offerableWorkers) - + // Call into the ClusterScheduler + val taskLists = scheduler.resourceOffers( + TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, offerableWorkers)) // Build a list of Mesos tasks for each slave val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) for ((taskList, index) <- taskLists.zipWithIndex) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 16e2f5cf3076d..1e482ce871c73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -67,7 +67,8 @@ private[spark] class LocalActor( def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - for (task <- scheduler.resourceOffers(offers).flatten) { + for (task <- scheduler.resourceOffers( + TaskSchedulerImpl.buildMapFromWorkerOffers(scheduler, offers)).flatten) { freeCores -= 1 executor.launchTask(executorBackend, task.taskId, task.serializedTask) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/LRUMap.scala b/core/src/main/scala/org/apache/spark/util/collection/LRUMap.scala new file mode 100644 index 0000000000000..d74fc69bd3781 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/LRUMap.scala @@ -0,0 +1,15 @@ +package org.apache.spark.util.collection + +import scala.collection.mutable +import java.util +import scala.collection.convert.Wrappers +import org.apache.commons.collections.map.{LRUMap => JLRUMap} + +class LRUMap[K, V](maxSize: Int, underlying: util.Map[K, V]) extends Wrappers.JMapWrapper(underlying) { + + def this(maxSize: Int) = this(maxSize, new JLRUMap(maxSize).asInstanceOf[util.Map[K, V]]) +} + + +class LRUSyncMap[K, V](maxSize: Int, underlying: util.Map[K, V]) + extends LRUMap[K, V](maxSize, underlying: util.Map[K, V]) with mutable.SynchronizedMap[K, V] \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 20f6e503872ac..6420d4029e45f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.FakeClock +import org.apache.spark.util.collection.LRUMap class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -298,6 +299,32 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { } } + test ("executors should be scheduled with LRU order when the feature is enabled") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, + ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) + sched.rootPool = new Pool("", sched.schedulingMode, 0, 0) + sched.schedulableBuilder = new FIFOSchedulableBuilder(sched.rootPool) + val executorWorkerOffer = new LRUMap[String, WorkerOffer](3) + executorWorkerOffer += "exec1" -> new WorkerOffer("exec1", "host1", 8) + executorWorkerOffer += "exec2" -> new WorkerOffer("exec2", "host2", 8) + executorWorkerOffer += "exec3" -> new WorkerOffer("exec3", "host3", 8) + val taskSet1 = createTaskSet(1) + val taskSet2 = createTaskSet(1) + val taskSet3 = createTaskSet(1) + val clock = new FakeClock + val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock) + val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock) + val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, clock) + sched.schedulableBuilder.addTaskSetManager(manager1, manager1.taskSet.properties) + sched.schedulableBuilder.addTaskSetManager(manager2, manager2.taskSet.properties) + sched.schedulableBuilder.addTaskSetManager(manager3, manager3.taskSet.properties) + sched.resourceOffers(executorWorkerOffer) + assert(sched.taskIdToExecutorId.values.toList(0) == "exec3") + assert(sched.taskIdToExecutorId.values.toList(1) == "exec2") + assert(sched.taskIdToExecutorId.values.toList(2) == "exec1") + } + /** * Utility method to create a TaskSet, potentially setting a particular sequence of preferred diff --git a/docs/configuration.md b/docs/configuration.md index 8e4c48c81f8be..95a5456b05a73 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -200,6 +200,14 @@ Apart from these, the following properties are also available, and may be useful to use fair sharing instead of queueing jobs one after another. Useful for multi-user services. + +