@@ -60,12 +60,12 @@ object AllocationType extends Enumeration {
6060 */
6161private [yarn] class YarnAllocationHandler (
6262 val conf : Configuration ,
63- val resourceManager : AMRMProtocol ,
63+ val resourceManager : AMRMProtocol ,
6464 val appAttemptId : ApplicationAttemptId ,
6565 val maxExecutors : Int ,
6666 val executorMemory : Int ,
6767 val executorCores : Int ,
68- val preferredHostToCount : Map [String , Int ],
68+ val preferredHostToCount : Map [String , Int ],
6969 val preferredRackToCount : Map [String , Int ],
7070 val sparkConf : SparkConf )
7171 extends Logging {
@@ -138,7 +138,9 @@ private[yarn] class YarnAllocationHandler(
138138 containers += container
139139 }
140140 // Add all ignored containers to released list
141- else releasedContainerList.add(container.getId())
141+ else {
142+ releasedContainerList.add(container.getId())
143+ }
142144 }
143145
144146 // Find the appropriate containers to use. Slightly non trivial groupBy ...
@@ -170,7 +172,7 @@ private[yarn] class YarnAllocationHandler(
170172 // remainingContainers = remaining
171173
172174 // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
173- // add remaining to release list. If we have insufficient containers, next allocation
175+ // add remaining to release list. If we have insufficient containers, next allocation
174176 // cycle will reallocate (but wont treat it as data local)
175177 for (container <- remaining) releasedContainerList.add(container.getId())
176178 remainingContainers = null
@@ -182,7 +184,7 @@ private[yarn] class YarnAllocationHandler(
182184
183185 if (rack != null ){
184186 val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0 )
185- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
187+ val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
186188 rackLocalContainers.get(rack).getOrElse(List ()).size
187189
188190
@@ -213,7 +215,7 @@ private[yarn] class YarnAllocationHandler(
213215 }
214216 }
215217
216- // Now that we have split the containers into various groups, go through them in order :
218+ // Now that we have split the containers into various groups, go through them in order :
217219 // first host local, then rack local and then off rack (everything else).
218220 // Note that the list we create below tries to ensure that not all containers end up within a
219221 // host if there are sufficiently large number of hosts/containers.
@@ -319,17 +321,24 @@ private[yarn] class YarnAllocationHandler(
319321 assert (containerSet != null )
320322
321323 containerSet -= containerId
322- if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
323- else allocatedHostToContainersMap.update(host, containerSet)
324+ if (containerSet.isEmpty) {
325+ allocatedHostToContainersMap.remove(host)
326+ } else {
327+ allocatedHostToContainersMap.update(host, containerSet)
328+ }
324329
325330 allocatedContainerToHostMap -= containerId
326331
327332 // Doing this within locked context, sigh ... move to outside ?
328333 val rack = YarnAllocationHandler .lookupRack(conf, host)
329334 if (rack != null ) {
330335 val rackCount = allocatedRackCount.getOrElse(rack, 0 ) - 1
331- if (rackCount > 0 ) allocatedRackCount.put(rack, rackCount)
332- else allocatedRackCount.remove(rack)
336+ if (rackCount > 0 ) {
337+ allocatedRackCount.put(rack, rackCount)
338+ }
339+ else {
340+ allocatedRackCount.remove(rack)
341+ }
333342 }
334343 }
335344 }
@@ -365,10 +374,10 @@ private[yarn] class YarnAllocationHandler(
365374 }
366375 }
367376
368- val requestedContainers : ArrayBuffer [ResourceRequest ] =
377+ val requestedContainers : ArrayBuffer [ResourceRequest ] =
369378 new ArrayBuffer [ResourceRequest ](rackToCounts.size)
370379 for ((rack, count) <- rackToCounts){
371- requestedContainers +=
380+ requestedContainers +=
372381 createResourceRequest(AllocationType .RACK , rack, count, YarnAllocationHandler .PRIORITY )
373382 }
374383
@@ -403,9 +412,9 @@ private[yarn] class YarnAllocationHandler(
403412 AllocationType .ANY , null , numExecutors, YarnAllocationHandler .PRIORITY ))
404413 }
405414 else {
406- // request for all hosts in preferred nodes and for numExecutors -
415+ // request for all hosts in preferred nodes and for numExecutors -
407416 // candidates.size, request by default allocation policy.
408- val hostContainerRequests : ArrayBuffer [ResourceRequest ] =
417+ val hostContainerRequests : ArrayBuffer [ResourceRequest ] =
409418 new ArrayBuffer [ResourceRequest ](preferredHostToCount.size)
410419 for ((candidateHost, candidateCount) <- preferredHostToCount) {
411420 val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
@@ -467,7 +476,7 @@ private[yarn] class YarnAllocationHandler(
467476
468477
469478 private def createResourceRequest (
470- requestType : AllocationType .AllocationType ,
479+ requestType : AllocationType .AllocationType ,
471480 resource: String ,
472481 numExecutors : Int ,
473482 priority : Int ): ResourceRequest = {
@@ -528,7 +537,7 @@ private[yarn] class YarnAllocationHandler(
528537 if (! retval.isEmpty) {
529538 releasedContainerList.removeAll(retval)
530539 for (v <- retval) pendingReleaseContainers.put(v, true )
531- logInfo(" Releasing " + retval.size + " containers. pendingReleaseContainers : " +
540+ logInfo(" Releasing " + retval.size + " containers. pendingReleaseContainers : " +
532541 pendingReleaseContainers)
533542 }
534543
@@ -539,7 +548,7 @@ private[yarn] class YarnAllocationHandler(
539548object YarnAllocationHandler {
540549
541550 val ANY_HOST = " *"
542- // All requests are issued with same priority : we do not (yet) have any distinction between
551+ // All requests are issued with same priority : we do not (yet) have any distinction between
543552 // request types (like map/reduce in hadoop for example)
544553 val PRIORITY = 1
545554
@@ -548,7 +557,7 @@ object YarnAllocationHandler {
548557
549558 // Host to rack map - saved from allocation requests
550559 // We are expecting this not to change.
551- // Note that it is possible for this to change : and RM will indicate that to us via update
560+ // Note that it is possible for this to change : and RM will indicate that to us via update
552561 // response to allocate. But we are punting on handling that for now.
553562 private val hostToRack = new ConcurrentHashMap [String , String ]()
554563 private val rackToHostSet = new ConcurrentHashMap [String , JSet [String ]]()
@@ -565,7 +574,7 @@ object YarnAllocationHandler {
565574 conf,
566575 resourceManager,
567576 appAttemptId,
568- args.numExecutors,
577+ args.numExecutors,
569578 args.executorMemory,
570579 args.executorCores,
571580 Map [String , Int ](),
@@ -587,7 +596,7 @@ object YarnAllocationHandler {
587596 conf,
588597 resourceManager,
589598 appAttemptId,
590- args.numExecutors,
599+ args.numExecutors,
591600 args.executorMemory,
592601 args.executorCores,
593602 hostToCount,
0 commit comments