diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 8e0533f39ae53..7be3ed4a6e650 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -54,7 +54,8 @@ private[yarn] class ExecutorRunnable( executorCores: Int, appId: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource]) extends Logging { + localResources: Map[String, LocalResource] + allocationMeter: Int) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ @@ -207,8 +208,12 @@ private[yarn] class ExecutorRunnable( }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) + + //sparkConf.get(NUMA_NODE) + val numaNode = allocationMeter%2 + val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", + s" numactl --cpunodebind=$numaNode --preferred=$numaNode " + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b66d1cf08eac..cd9906a7a21ad 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -130,6 +130,9 @@ private[yarn] class YarnAllocator( private var numUnexpectedContainerRelease = 0L private val containerIdToExecutorId = new HashMap[ContainerId, String] + // Keep track already allocated executor per host + val hostToAllocationMeterMap = new HashMap[String, Int] + // Executor memory in MB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. @@ -507,6 +510,10 @@ private[yarn] class YarnAllocator( if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { + + val allocationMeter = hostToAllocationMeterMap.getOrElseUpdate(executorHostname, 0) + hostToAllocationMeterMap.put(executorHostname, allocationMeter + 1) + launcherPool.execute(new Runnable { override def run(): Unit = { try { @@ -521,7 +528,8 @@ private[yarn] class YarnAllocator( executorCores, appAttemptId.getApplicationId.toString, securityMgr, - localResources + localResources, + allocationMeter ).run() updateInternalState() } catch {