@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
1919
2020import java .util .Collections
2121import java .util .concurrent ._
22- import java .util .concurrent .atomic .AtomicInteger
2322import java .util .regex .Pattern
2423
2524import scala .collection .JavaConversions ._
@@ -35,22 +34,16 @@ import org.apache.hadoop.yarn.util.RackResolver
3534
3635import org .apache .spark .{Logging , SecurityManager , SparkConf }
3736import org .apache .spark .deploy .yarn .YarnSparkHadoopUtil ._
38- import org .apache .spark .scheduler .SplitInfo
3937import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
4038
41- object AllocationType extends Enumeration {
42- type AllocationType = Value
43- val HOST, RACK, ANY = Value
44- }
45-
4639/**
4740 * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
4841 * what to do with containers when YARN fulfills these requests.
4942 *
5043 * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
5144 * * Making our resource needs known, which updates local bookkeeping about containers requested.
5245 * * Calling "allocate", which syncs our local container requests with the RM, and returns any
53- * containers that YARN has granted to us.
46+ * containers that YARN has granted to us. This also functions as a heartbeat.
5447 * * Processing the containers granted to us to possibly launch executors inside of them.
5548 *
5649 * The public methods of this class are thread-safe. All methods that mutate state are
@@ -68,18 +61,17 @@ private[yarn] class YarnAllocator(
6861 import YarnAllocator ._
6962
7063 // These two complementary data structures are locked on allocatedHostToContainersMap.
71- private [yarn] val allocatedHostToContainersMap =
64+ // Visible for testing.
65+ val allocatedHostToContainersMap =
7266 new HashMap [String , collection.mutable.Set [ContainerId ]]
73- private [yarn] val allocatedContainerToHostMap = new HashMap [ContainerId , String ]
67+ val allocatedContainerToHostMap = new HashMap [ContainerId , String ]
7468
7569 // Containers that we no longer care about. We've either already told the RM to release them or
7670 // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
7771 // completed.
7872 private val releasedContainers = Collections .newSetFromMap[ContainerId ](
7973 new ConcurrentHashMap [ContainerId , java.lang.Boolean ])
8074
81- // Number of container requests that have been sent to, but not yet allocated by the
82- // ApplicationMaster.
8375 @ volatile private var numExecutorsRunning = 0
8476 // Used to generate a unique ID per executor
8577 private var executorIdCounter = 0
@@ -133,20 +125,9 @@ private[yarn] class YarnAllocator(
133125
134126 /**
135127 * Request as many executors from the ResourceManager as needed to reach the desired total.
136- * This takes into account executors already running or pending.
137128 */
138129 def requestTotalExecutors (requestedTotal : Int ): Unit = synchronized {
139- val currentTotal = getNumPendingAllocate + numExecutorsRunning
140- if (requestedTotal > currentTotal) {
141- maxExecutors += (requestedTotal - currentTotal)
142- // We need to call `allocateResources` here to avoid the following race condition:
143- // If we request executors twice before `allocateResources` is called, then we will end up
144- // double counting the number requested because `numPendingAllocate` is not updated yet.
145- allocateResources()
146- } else {
147- logInfo(s " Not allocating more executors because there are already $currentTotal " +
148- s " (application requested $requestedTotal total) " )
149- }
130+ maxExecutors = requestedTotal
150131 }
151132
152133 /**
@@ -177,9 +158,8 @@ private[yarn] class YarnAllocator(
177158 val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
178159
179160 if (missing > 0 ) {
180- val totalExecutorMemory = resource.getMemory
181- logInfo(s " Will request $missing executor containers, each with $totalExecutorMemory MB " +
182- s " memory including $memoryOverhead MB overhead " )
161+ logInfo(s " Will request $missing executor containers, each with ${resource.getVirtualCores} " +
162+ s " cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead " )
183163 }
184164
185165 addResourceRequests(missing)
@@ -212,19 +192,12 @@ private[yarn] class YarnAllocator(
212192 }
213193
214194 /**
215- * Request numExecutors additional containers from YARN.
195+ * Request numExecutors additional containers from YARN. Visible for testing.
216196 */
217- private [yarn] def addResourceRequests (numExecutors : Int ): Unit = {
218- val containerRequests = new ArrayBuffer [ContainerRequest ]
197+ def addResourceRequests (numExecutors : Int ): Unit = {
219198 for (i <- 0 until numExecutors) {
220- containerRequests += new ContainerRequest (resource, null , null , RM_REQUEST_PRIORITY )
221- }
222-
223- for (request <- containerRequests) {
199+ val request = new ContainerRequest (resource, null , null , RM_REQUEST_PRIORITY )
224200 amClient.addContainerRequest(request)
225- }
226-
227- for (request <- containerRequests) {
228201 val nodes = request.getNodes
229202 val hostStr = if (nodes == null || nodes.isEmpty) " Any" else nodes.last
230203 logInfo(" Container request (host: %s, capability: %s" .format(hostStr, resource))
@@ -236,8 +209,10 @@ private[yarn] class YarnAllocator(
236209 *
237210 * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
238211 * in YARN granting containers that we no longer need. In this case, we release them.
212+ *
213+ * Visible for testing.
239214 */
240- private [yarn] def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = {
215+ def handleAllocatedContainers (allocatedContainers : Seq [Container ]): Unit = {
241216 val containersToUse = new ArrayBuffer [Container ](allocatedContainers.size)
242217
243218 // Match incoming requests by host
@@ -263,14 +238,16 @@ private[yarn] class YarnAllocator(
263238 }
264239
265240 if (! remainingAfterOffRackMatches.isEmpty) {
241+ logDebug(s " Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
242+ s " allocated to us " )
266243 for (container <- remainingAfterOffRackMatches) {
267244 internalReleaseContainer(container)
268245 }
269246 }
270247
271248 runAllocatedContainers(containersToUse)
272249
273- logInfo(" Received %d containers from YARN, launching executors on %d."
250+ logInfo(" Received %d containers from YARN, launching executors on %d of them ."
274251 .format(allocatedContainers.size, containersToUse.size))
275252 }
276253
@@ -279,6 +256,8 @@ private[yarn] class YarnAllocator(
279256 * finds one, removes the request so that it won't be submitted again. Places the container into
280257 * containersToUse or remaining.
281258 *
259+ * @param allocatedContainer container that was given to us by YARN
260+ * @location resource name, either a node, rack, or *
282261 * @param containersToUse list of containers that will be used
283262 * @param remaining list of containers that will not be used
284263 */
@@ -316,13 +295,11 @@ private[yarn] class YarnAllocator(
316295
317296 logInfo(" Launching container %s for on host %s" .format(containerId, executorHostname))
318297
319- allocatedHostToContainersMap.synchronized {
320- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
321- new HashSet [ContainerId ])
298+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
299+ new HashSet [ContainerId ])
322300
323- containerSet += containerId
324- allocatedContainerToHostMap.put(containerId, executorHostname)
325- }
301+ containerSet += containerId
302+ allocatedContainerToHostMap.put(containerId, executorHostname)
326303
327304 val executorRunnable = new ExecutorRunnable (
328305 container,
0 commit comments