Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val memoryPerExecutor: Int, // in Mb
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")

// only valid when spark.executor.multiPerWorker is set to true
var maxCorePerExecutor = maxCores
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think in var maxCorePerExecutor = maxCores the two variables are different. maxCores is total core's value of a application. but maxCorePerExecutor is cores of per executor. in schedule() app's leftCoreToAssign come from maxCores value.so two variables cannot be equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just an initial value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i know. but in ApplicationInfo.scala the coresLeft value is same to the value of desc.maxCores. in schedule leftCoreToAssign actually is equto to maxCorePerExecutor. so i think there are not right because leftCoreToAssign is total cores of all executors and maxCorePerExecutor is cores of one executor. i donot know whether you understand it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you think "in schedule leftCoreToAssign actually is equto to maxCorePerExecutor", it's the minimum value between (app.coresLeft) and the sum of all worker free cores... var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,but in ApplicationInfo,app.coresLeft is equal to app.maxCores. so in schedule when the sum of all worker free cores is greater than app.coresLeft, now leftCoreToAssign actually is equal to maxCorePerExecutor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thanks i see.

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object JsonProtocol {
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
("memoryperslave" -> obj.desc.memoryPerExecutor) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
Expand All @@ -54,7 +54,7 @@ private[spark] object JsonProtocol {
def writeApplicationDescription(obj: ApplicationDescription) = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("memoryperslave" -> obj.memoryPerExecutor) ~
("user" -> obj.user) ~
("sparkhome" -> obj.sparkHome) ~
("command" -> obj.command.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[spark] class ApplicationInfo(
}

def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutor)
executors(exec.id) = exec
coresGranted += cores
exec
Expand Down
133 changes: 109 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.{ListBuffer, ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -457,35 +457,17 @@ private[spark] class Master(
* launched an executor for the app on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/
def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) &&
worker.coresFree > 0
}

/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
def schedule() {
if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
private def startSingleExecutorPerWorker() {
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Expand Down Expand Up @@ -523,6 +505,109 @@ private[spark] class Master(
}
}

private def startMultiExecutorsPerWorker() {
// allow user to run multiple executors in the same worker
// (within the same worker JVM process)
if (spreadOutApps) {
for (app <- waitingApps if app.coresLeft > 0) {
var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.coresFree > 0 && worker.memoryFree >=
app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse
val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get
var mostFreeCoreWorkerPos = 0
val maxAvailableSlots = usableWorkers.map(worker => math.min(worker.coresFree,
worker.memoryFree / app.desc.memoryPerExecutor)).sum
var leftCoreToAssign = math.min(app.coresLeft, maxAvailableSlots)
val numUsable = usableWorkers.length
// Number of cores of each executor assigned to each worker
val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int])
val assignedSum = Array.fill[Int](numUsable)(0)
var pos = 0
val noEnoughMemoryWorkers = new HashSet[Int]
var maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree
while (leftCoreToAssign > 0 && noEnoughMemoryWorkers.size < numUsable) {
if ((usableWorkers(mostFreeCoreWorkerPos).coresFree - assignedSum(mostFreeCoreWorkerPos) <
usableWorkers(pos).coresFree - assignedSum(pos) ||
noEnoughMemoryWorkers.contains(mostFreeCoreWorkerPos)) &&
usableWorkers(pos).coresFree - assignedSum(pos) > 0 &&
usableWorkers(pos).memoryFree >=
app.desc.memoryPerExecutor * (assigned(pos).length + 1)) {
mostFreeCoreWorkerPos = pos
}
maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree -
assignedSum(mostFreeCoreWorkerPos)
val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore),
leftCoreToAssign)
if (usableWorkers(pos).coresFree - assignedSum(pos) >= coreToAssign) {
if (usableWorkers(pos).memoryFree >=
app.desc.memoryPerExecutor * (assigned(pos).length + 1)) {
leftCoreToAssign -= coreToAssign
assigned(pos) += coreToAssign
assignedSum(pos) += coreToAssign
}
}
if (usableWorkers(pos).memoryFree < app.desc.memoryPerExecutor *
(assigned(pos).length + 1)) {
noEnoughMemoryWorkers += pos
}
pos = (pos + 1) % numUsable
}

// Now that we've decided how many executors and the core number for each to
// give on each node, let's actually give them
for (pos <- 0 until numUsable) {
for (execIdx <- 0 until assigned(pos).length) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0 &&
app.desc.memoryPerExecutor <= worker.memoryFree) {
val coreNumPerExecutor = app.desc.maxCorePerExecutor.get
var coresLeft = math.min(worker.coresFree, app.coresLeft)
while (coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) {
val coreToAssign = math.min(coreNumPerExecutor, coresLeft)
val exec = app.addExecutor(worker, coreToAssign)
launchExecutor(worker, exec)
coresLeft -= coreToAssign
app.state = ApplicationState.RUNNING
}
}
}
}
}


/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
def schedule() {
if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}

if (!conf.getBoolean("spark.executor.multiPerWorker", true)) {
startSingleExecutorPerWorker()
} else {
startMultiExecutorsPerWorker()
}
}

def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
</li>
<li>
<strong>Executor Memory:</strong>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
{Utils.megabytesToString(app.desc.memoryPerExecutor)}
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
<td sorttable_customkey={app.desc.memoryPerExecutor.toString}>
{Utils.megabytesToString(app.desc.memoryPerExecutor)}
</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ private[spark] class SparkDeploySchedulerBackend(
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

if (conf.getBoolean("spark.executor.multiPerWorker", true)) {
appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1))
}
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
}
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,21 @@ Apart from these, the following properties are also available, and may be useful
with spark.executor.memory.
</td>
</tr>
<tr>
<td>spark.executor.multiPerWorker</td>
<td>false</td>
<td>
enable user to run multiple executors in the same worker.
</td>
</tr>
<tr>
<td>spark.executor.maxCoreNumPerExecutor</td>
<td>1</td>
<td>
set the max number of cores assigned to each executor; this property is only valid when
<code>spark.executor.multiPerWorker</code> is set to true.
</td>
</tr>
<tr>
<td>spark.executor.extraClassPath</td>
<td>(none)</td>
Expand Down