From 4ab696a813889e75cb3a5d49bbfdf37e938d8dc5 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 19 Jun 2014 01:51:47 -0700 Subject: [PATCH 1/5] bootstrap to retrieve driver spark conf --- .../CoarseGrainedExecutorBackend.scala | 52 ++++++++++++------- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++ .../CoarseGrainedSchedulerBackend.scala | 2 + 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2279d77c91c8..65c7df4bd88f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -18,9 +18,14 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await import akka.actor._ import akka.remote._ +import akka.pattern.Patterns +import akka.util.Timeout import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState @@ -101,26 +106,33 @@ private[spark] object CoarseGrainedExecutorBackend { workerUrl: Option[String]) { SparkHadoopUtil.get.runAsSparkUser { () => - // Debug code - Utils.checkHost(hostname) - - val conf = new SparkConf - // Create a new ActorSystem to run the backend, because we can't create a - // SparkEnv / Executor before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - conf, new SecurityManager(conf)) - // set it - val sparkHostPort = hostname + ":" + boundPort - actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, - sparkHostPort, cores), - name = "Executor") - workerUrl.foreach { - url => - actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") - } - actorSystem.awaitTermination() - + // Debug code + Utils.checkHost(hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val (fetcher, _) = AkkaUtils.createActorSystem( + "driverConfFetcher", hostname, 0, executorConf, new SecurityManager(executorConf)) + val driver = fetcher.actorSelection(driverUrl) + val timeout = new Timeout(5, TimeUnit.MINUTES) + val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) + val props = Await.result(fut, timeout.duration).asInstanceOf[Seq[(String, String)]] + fetcher.shutdown() + + // Create a new ActorSystem to run the backend, because we can't create a + // SparkEnv / Executor before getting started with all our system properties, etc + val driverConf = new SparkConf().setAll(props) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf)) + // set it + val sparkHostPort = hostname + ":" + boundPort + actorSystem.actorOf( + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + name = "Executor") + workerUrl.foreach { url => + actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + } + actorSystem.awaitTermination() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index ca74069ef885..93461694137c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,11 +22,14 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{SerializableBuffer, Utils} +import org.apache.spark.SparkConf private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { + case object RetrieveSparkProps extends CoarseGrainedClusterMessage + // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e47a060683a2..96a98ea2acfa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -124,6 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) + case RetrieveSparkProps => + sender ! sparkProperties } // Make fake resource offers on all executors From 7947c18aa58a2a94661fd824fef5474755b0ca2c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 19 Jun 2014 02:23:47 -0700 Subject: [PATCH 2/5] increase slack size for akka --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 11 ++++++----- .../main/scala/org/apache/spark/util/AkkaUtils.scala | 3 +++ 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 65c7df4bd88f..6468219af40b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -112,7 +112,7 @@ private[spark] object CoarseGrainedExecutorBackend { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val (fetcher, _) = AkkaUtils.createActorSystem( - "driverConfFetcher", hostname, 0, executorConf, new SecurityManager(executorConf)) + "driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf)) val driver = fetcher.actorSelection(driverUrl) val timeout = new Timeout(5, TimeUnit.MINUTES) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c..557b9a3f46a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -212,7 +212,7 @@ private[spark] class Executor( val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 96a98ea2acfa..fe3c9ce38da4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,14 +145,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A for (task <- tasks.flatten) { val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) - if (serializedTask.limit >= akkaFrameSize - 1024) { + if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { - var msg = "Serialized task %s:%d was %d bytes which " + - "exceeds spark.akka.frameSize (%d bytes). " + - "Consider using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + + "spark.akka.frameSize or using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, + AkkaUtils.reservedSizeBytes) taskSet.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a8d12bb2a016..9930c717492f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging { def maxFrameSizeBytes(conf: SparkConf): Int = { conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } + + /** Space reserved for extra data in an Akka message besides serialized task or task result. */ + val reservedSizeBytes = 200 * 1024 } From 46d332d924c3795bd79b9b0bf4a251fe8d669027 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 19 Jun 2014 03:58:00 -0700 Subject: [PATCH 3/5] fix a test --- .../spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index efef9d26dadc..f77661ccbd1c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -35,7 +35,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext val thrown = intercept[SparkException] { larger.collect() } - assert(thrown.getMessage.contains("Consider using broadcast variables for large values")) + assert(thrown.getMessage.contains("using broadcast variables for large values")) val smaller = sc.parallelize(1 to 4).collect() assert(smaller.size === 4) } From 68e1dfbd484f8134c801e67cd510b428240da17c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 19 Jun 2014 11:10:54 -0700 Subject: [PATCH 4/5] use timeout from AkkaUtils; remove props from RegisteredExecutor --- .../CoarseGrainedExecutorBackend.scala | 20 ++++++++----------- .../cluster/CoarseGrainedClusterMessage.scala | 3 +-- .../CoarseGrainedSchedulerBackend.scala | 2 +- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6468219af40b..62947eda90f0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -18,14 +18,12 @@ package org.apache.spark.executor import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit import scala.concurrent.Await import akka.actor._ import akka.remote._ import akka.pattern.Patterns -import akka.util.Timeout import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState @@ -39,10 +37,8 @@ private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, - cores: Int) - extends Actor - with ExecutorBackend - with Logging { + cores: Int, + sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") @@ -57,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend( } override def receive = { - case RegisteredExecutor(sparkProperties) => + case RegisteredExecutor => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, @@ -114,20 +110,20 @@ private[spark] object CoarseGrainedExecutorBackend { val (fetcher, _) = AkkaUtils.createActorSystem( "driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf)) val driver = fetcher.actorSelection(driverUrl) - val timeout = new Timeout(5, TimeUnit.MINUTES) + val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) - val props = Await.result(fut, timeout.duration).asInstanceOf[Seq[(String, String)]] + val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] fetcher.shutdown() - // Create a new ActorSystem to run the backend, because we can't create a - // SparkEnv / Executor before getting started with all our system properties, etc + // Create a new ActorSystem using driver's Spark properties to run the backend. val driverConf = new SparkConf().setAll(props) val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + Props(classOf[CoarseGrainedExecutorBackend], + driverUrl, executorId, sparkHostPort, cores, props), name = "Executor") workerUrl.foreach { url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 93461694137c..1d12ce78a65c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -36,8 +36,7 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage - case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends CoarseGrainedClusterMessage + case object RegisteredExecutor extends CoarseGrainedClusterMessage case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fe3c9ce38da4..05d01b0c821f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -75,7 +75,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) - sender ! RegisteredExecutor(sparkProperties) + sender ! RegisteredExecutor executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 totalCores(executorId) = cores From 77ff32de99e1fc177bfe3a26bfa420a7825e752a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 20 Jun 2014 15:13:23 -0700 Subject: [PATCH 5/5] organize imports --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 8 ++++---- .../scheduler/cluster/CoarseGrainedClusterMessage.scala | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 62947eda90f0..b5fd334f4020 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,16 +21,16 @@ import java.nio.ByteBuffer import scala.concurrent.Await -import akka.actor._ -import akka.remote._ +import akka.actor.{Actor, ActorSelection, Props} import akka.pattern.Patterns +import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} -import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 1d12ce78a65c..318e16552201 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{SerializableBuffer, Utils} -import org.apache.spark.SparkConf private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable