From fe530cddae1cb573387f7fc1eb344513c91d69bb Mon Sep 17 00:00:00 2001 From: Akshat Aranya Date: Thu, 21 May 2015 10:23:54 -0700 Subject: [PATCH 1/4] Speed up task scheduling in standalone mode by reusing serializer instead of creating a new one for each task. --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f107148f3b8c6..def00bd0a579c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -183,8 +183,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { + val ser = SparkEnv.get.closureSerializer.newInstance() for (task <- tasks.flatten) { - val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) From 0b8ca9389da1a46ae24e27f34e5174742d0a5912 Mon Sep 17 00:00:00 2001 From: Akshat Aranya Date: Thu, 21 May 2015 11:53:01 -0700 Subject: [PATCH 2/4] Incorporate review comments --- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index def00bd0a579c..38ffd544380f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -55,6 +55,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val maxRegisteredWaitingTimeMs = conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") val createTime = System.currentTimeMillis() + // If this CoarseGrainedSchedulerBackend is changed to support multiple threads, then this may need + // to be changed so that we don't share the serializer instance across threads + private val ser = SparkEnv.get.closureSerializer.newInstance() private val executorDataMap = new HashMap[String, ExecutorData] @@ -163,7 +166,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Make fake resource offers on all executors - def makeOffers() { + private def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) @@ -175,15 +178,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Make fake resource offers on just one executor - def makeOffers(executorId: String) { + private def makeOffers(executorId: String) { val executorData = executorDataMap(executorId) launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)))) } // Launch tasks returned by a set of resource offers - def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - val ser = SparkEnv.get.closureSerializer.newInstance() + private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { From bd4a5dd52e0081ed08d934e9330790b86236a6b1 Mon Sep 17 00:00:00 2001 From: Akshat Aranya Date: Thu, 21 May 2015 12:08:53 -0700 Subject: [PATCH 3/4] Style fix --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 38ffd544380f8..754b091cc06f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -55,8 +55,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val maxRegisteredWaitingTimeMs = conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") val createTime = System.currentTimeMillis() - // If this CoarseGrainedSchedulerBackend is changed to support multiple threads, then this may need - // to be changed so that we don't share the serializer instance across threads + // If this CoarseGrainedSchedulerBackend is changed to support multiple threads, + // then this may need to be changed so that we don't share the serializer + // instance across threads private val ser = SparkEnv.get.closureSerializer.newInstance() private val executorDataMap = new HashMap[String, ExecutorData] From 12d8c9e190dcaee80285343ca826ec51db754034 Mon Sep 17 00:00:00 2001 From: Akshat Aranya Date: Fri, 22 May 2015 11:07:57 -0700 Subject: [PATCH 4/4] Reduce visibility of serializer --- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 754b091cc06f9..c5bc6294a5577 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -55,10 +55,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val maxRegisteredWaitingTimeMs = conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") val createTime = System.currentTimeMillis() - // If this CoarseGrainedSchedulerBackend is changed to support multiple threads, - // then this may need to be changed so that we don't share the serializer - // instance across threads - private val ser = SparkEnv.get.closureSerializer.newInstance() private val executorDataMap = new HashMap[String, ExecutorData] @@ -73,6 +69,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { + // If this DriverEndpoint is changed to support multiple threads, + // then this may need to be changed so that we don't share the serializer + // instance across threads + private val ser = SparkEnv.get.closureSerializer.newInstance() + override protected def log = CoarseGrainedSchedulerBackend.this.log private val addressToExecutorId = new HashMap[RpcAddress, String]