From 139e87558d728c5ae4ccf297c1702a73d5573335 Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Fri, 27 May 2016 10:32:49 +0100 Subject: [PATCH 1/5] Use a minimum of 3 dispatcher threads to avoid deadlocks --- .../src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 4f8fe018b432d..3bf12ef4bfaf7 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -192,8 +192,9 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Thread pool used for dispatching messages. */ private val threadpool: ThreadPoolExecutor = { + // Use a minimum of 3 threads to avoid deadlocks val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", - math.max(2, Runtime.getRuntime.availableProcessors())) + math.max(3, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) From c991d701c125036d4008d7ed05a829293e45f5eb Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Tue, 31 May 2016 22:54:58 +0100 Subject: [PATCH 2/5] Don't block in BlockManagerMaster.removeExecutor --- .../apache/spark/rpc/netty/Dispatcher.scala | 3 +- .../spark/storage/BlockManagerMaster.scala | 3 +- .../spark/DistributedSuiteMinTHreads.scala | 42 +++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 3bf12ef4bfaf7..4f8fe018b432d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -192,9 +192,8 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Thread pool used for dispatching messages. */ private val threadpool: ThreadPoolExecutor = { - // Use a minimum of 3 threads to avoid deadlocks val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", - math.max(3, Runtime.getRuntime.availableProcessors())) + math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index c22d2e0fb61fa..b3a725d37b2de 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -38,7 +38,8 @@ class BlockManagerMaster( /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { - tell(RemoveExecutor(execId)) + // Avoid potential deadlocks by using non-blocking call + driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala b/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala new file mode 100644 index 0000000000000..98925d17fc70a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + + +class DistributedSuiteMinThreads extends DistributedSuite { + + // This suite runs DistributeSuite with the number of dispatcher + // threads set to the minimum of 2 to help identify deadlocks + + val numThreads = System.getProperty("spark.rpc.netty.dispatcher.numThreads") + + override def beforeAll() { + super.beforeAll() + System.setProperty("spark.rpc.netty.dispatcher.numThreads", "2") + } + + override def afterAll() { + if (numThreads == null) { + System.clearProperty("spark.rpc.netty.dispatcher.numThreads") + } else { + System.setProperty("spark.rpc.netty.dispatcher.numThreads", numThreads) + } + super.afterAll() + } + +} From 6a95c50435f026b270c335763a35ac16c5ab89ae Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Wed, 1 Jun 2016 07:36:24 +0100 Subject: [PATCH 3/5] Create new remonveExecutorAsync method --- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../org/apache/spark/storage/BlockManagerMaster.scala | 11 +++++++++-- .../org/apache/spark/DistributedSuiteMinTHreads.scala | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0fea9c123bcfb..e84cb6346d51c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // manager to reregister itself. If that happens, the block manager master will know // about the executor, but the scheduler will not. Therefore, we should remove the // executor from the block manager when we hit this case. - scheduler.sc.env.blockManager.master.removeExecutor(executorId) + scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b3a725d37b2de..f229a607c6cec 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -38,11 +38,18 @@ class BlockManagerMaster( /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { - // Avoid potential deadlocks by using non-blocking call - driverEndpoint.ask[Boolean](RemoveExecutor(execId)) + tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Request removal of a dead executor from the driver endpoint. + * This is only called on the driver side. Non-blocking + */ + def removeExecutorAsync(execId: String) { + driverEndpoint.ask[Boolean](RemoveExecutor(execId)) + logInfo("Removal of executor " + execId + " requested") + } + /** Register the BlockManager's id with the driver. */ def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala b/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala index 98925d17fc70a..b3d3c667f958c 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala @@ -22,7 +22,7 @@ class DistributedSuiteMinThreads extends DistributedSuite { // This suite runs DistributeSuite with the number of dispatcher // threads set to the minimum of 2 to help identify deadlocks - + val numThreads = System.getProperty("spark.rpc.netty.dispatcher.numThreads") override def beforeAll() { From eb900d813aa367b5b5b6962f41021cb9a368ee2c Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Wed, 1 Jun 2016 08:12:29 +0100 Subject: [PATCH 4/5] scalastyle fix --- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index f229a607c6cec..0bbc2d2b5ca1b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -42,8 +42,8 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Request removal of a dead executor from the driver endpoint. - * This is only called on the driver side. Non-blocking + /** Request removal of a dead executor from the driver endpoint. + * This is only called on the driver side. Non-blocking */ def removeExecutorAsync(execId: String) { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) From 4d3fc6ffb32c6e07e1f97b22df5497ce0d8c2096 Mon Sep 17 00:00:00 2001 From: Pete Robbins Date: Wed, 1 Jun 2016 19:44:48 +0100 Subject: [PATCH 5/5] remove slow test suite --- .../spark/DistributedSuiteMinTHreads.scala | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala diff --git a/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala b/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala deleted file mode 100644 index b3d3c667f958c..0000000000000 --- a/core/src/test/scala/org/apache/spark/DistributedSuiteMinTHreads.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - - -class DistributedSuiteMinThreads extends DistributedSuite { - - // This suite runs DistributeSuite with the number of dispatcher - // threads set to the minimum of 2 to help identify deadlocks - - val numThreads = System.getProperty("spark.rpc.netty.dispatcher.numThreads") - - override def beforeAll() { - super.beforeAll() - System.setProperty("spark.rpc.netty.dispatcher.numThreads", "2") - } - - override def afterAll() { - if (numThreads == null) { - System.clearProperty("spark.rpc.netty.dispatcher.numThreads") - } else { - System.setProperty("spark.rpc.netty.dispatcher.numThreads", numThreads) - } - super.afterAll() - } - -}