Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
the environment of the executor launcher.
</td>
</tr>
<tr>
<td><code>spark.yarn.containerLauncherMaxThreads</code></td>
<td>25</td>
<td>
The maximum number of threads to use in the application master for launching executor containers.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.util.{List => JList}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._
Expand All @@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

import com.google.common.util.concurrent.ThreadFactoryBuilder

object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
Expand Down Expand Up @@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)

private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
1, TimeUnit.MINUTES,
new LinkedBlockingQueue[Runnable](),
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)

def getNumExecutorsRunning: Int = numExecutorsRunning.intValue

def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
Expand Down Expand Up @@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
executorMemory,
executorCores,
securityMgr)
new Thread(executorRunnable).start()
launcherPool.execute(executorRunnable)
}
}
logDebug("""
Expand Down