Skip to content

Commit 7e49f10

Browse files
committed
A leak of event loops may be causing test failures.
1 parent d8176b1 commit 7e49f10

File tree

13 files changed

+83
-47
lines changed

13 files changed

+83
-47
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ object SparkEnv extends Logging {
274274
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
275275

276276
val blockTransferService =
277-
conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
277+
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278278
case "netty" =>
279279
new NettyBlockTransferService(conf)
280280
case "nio" =>

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
106106
result.future
107107
}
108108

109-
override def close(): Unit = server.close()
109+
override def close(): Unit = {
110+
server.close()
111+
clientFactory.close()
112+
}
110113
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
11781178

11791179
def stop(): Unit = {
11801180
blockTransferService.close()
1181+
if (shuffleClient ne blockTransferService) {
1182+
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
1183+
shuffleClient.close()
1184+
}
11811185
diskBlockManager.stop()
11821186
actorSystem.stop(slaveActor)
11831187
blockInfo.clear()

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
2525
/**
2626
* Test add and remove behavior of ExecutorAllocationManager.
2727
*/
28-
class ExecutorAllocationManagerSuite extends FunSuite {
28+
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
2929
import ExecutorAllocationManager._
3030
import ExecutorAllocationManagerSuite._
3131

@@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
3636
.setAppName("test-executor-allocation-manager")
3737
.set("spark.dynamicAllocation.enabled", "true")
3838
intercept[SparkException] { new SparkContext(conf) }
39+
SparkEnv.get.stop() // cleanup the created environment
3940

4041
// Only min
4142
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
4243
intercept[SparkException] { new SparkContext(conf1) }
44+
SparkEnv.get.stop()
4345

4446
// Only max
4547
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
4648
intercept[SparkException] { new SparkContext(conf2) }
49+
SparkEnv.get.stop()
4750

4851
// Both min and max, but min > max
4952
intercept[SparkException] { createSparkContext(2, 1) }
53+
SparkEnv.get.stop()
5054

5155
// Both min and max, and min == max
5256
val sc1 = createSparkContext(1, 1)
@@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
6064
}
6165

6266
test("starting state") {
63-
val sc = createSparkContext()
67+
sc = createSparkContext()
6468
val manager = sc.executorAllocationManager.get
6569
assert(numExecutorsPending(manager) === 0)
6670
assert(executorsPendingToRemove(manager).isEmpty)
6771
assert(executorIds(manager).isEmpty)
6872
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
6973
assert(removeTimes(manager).isEmpty)
70-
sc.stop()
7174
}
7275

7376
test("add executors") {
74-
val sc = createSparkContext(1, 10)
77+
sc = createSparkContext(1, 10)
7578
val manager = sc.executorAllocationManager.get
7679

7780
// Keep adding until the limit is reached
@@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
112115
assert(addExecutors(manager) === 0)
113116
assert(numExecutorsPending(manager) === 6)
114117
assert(numExecutorsToAdd(manager) === 1)
115-
sc.stop()
116118
}
117119

118120
test("remove executors") {
119-
val sc = createSparkContext(5, 10)
121+
sc = createSparkContext(5, 10)
120122
val manager = sc.executorAllocationManager.get
121123
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
122124

@@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
163165
assert(executorsPendingToRemove(manager).isEmpty)
164166
assert(!removeExecutor(manager, "8"))
165167
assert(executorsPendingToRemove(manager).isEmpty)
166-
sc.stop()
167168
}
168169

169170
test ("interleaving add and remove") {
170-
val sc = createSparkContext(5, 10)
171+
sc = createSparkContext(5, 10)
171172
val manager = sc.executorAllocationManager.get
172173

173174
// Add a few executors
@@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
232233
onExecutorAdded(manager, "15")
233234
onExecutorAdded(manager, "16")
234235
assert(executorIds(manager).size === 10)
235-
sc.stop()
236236
}
237237

238238
test("starting/canceling add timer") {
239-
val sc = createSparkContext(2, 10)
239+
sc = createSparkContext(2, 10)
240240
val clock = new TestClock(8888L)
241241
val manager = sc.executorAllocationManager.get
242242
manager.setClock(clock)
@@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
268268
}
269269

270270
test("starting/canceling remove timers") {
271-
val sc = createSparkContext(2, 10)
271+
sc = createSparkContext(2, 10)
272272
val clock = new TestClock(14444L)
273273
val manager = sc.executorAllocationManager.get
274274
manager.setClock(clock)
@@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
313313
}
314314

315315
test("mock polling loop with no events") {
316-
val sc = createSparkContext(1, 20)
316+
sc = createSparkContext(1, 20)
317317
val manager = sc.executorAllocationManager.get
318318
val clock = new TestClock(2020L)
319319
manager.setClock(clock)
@@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
339339
}
340340

341341
test("mock polling loop add behavior") {
342-
val sc = createSparkContext(1, 20)
342+
sc = createSparkContext(1, 20)
343343
val clock = new TestClock(2020L)
344344
val manager = sc.executorAllocationManager.get
345345
manager.setClock(clock)
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
388388
}
389389

390390
test("mock polling loop remove behavior") {
391-
val sc = createSparkContext(1, 20)
391+
sc = createSparkContext(1, 20)
392392
val clock = new TestClock(2020L)
393393
val manager = sc.executorAllocationManager.get
394394
manager.setClock(clock)
@@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
449449
}
450450

451451
test("listeners trigger add executors correctly") {
452-
val sc = createSparkContext(2, 10)
452+
sc = createSparkContext(2, 10)
453453
val manager = sc.executorAllocationManager.get
454454
assert(addTime(manager) === NOT_SET)
455455

@@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
479479
}
480480

481481
test("listeners trigger remove executors correctly") {
482-
val sc = createSparkContext(2, 10)
482+
sc = createSparkContext(2, 10)
483483
val manager = sc.executorAllocationManager.get
484484
assert(removeTimes(manager).isEmpty)
485485

@@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
510510
}
511511

512512
test("listeners trigger add and remove executor callbacks correctly") {
513-
val sc = createSparkContext(2, 10)
513+
sc = createSparkContext(2, 10)
514514
val manager = sc.executorAllocationManager.get
515515
assert(executorIds(manager).isEmpty)
516516
assert(removeTimes(manager).isEmpty)

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
2828
import org.apache.spark.storage.BlockManagerId
2929
import org.apache.spark.util.AkkaUtils
3030

31-
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
31+
class MapOutputTrackerSuite extends FunSuite {
3232
private val conf = new SparkConf
3333

3434
test("master start and stop") {
@@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
3737
tracker.trackerActor =
3838
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
3939
tracker.stop()
40+
actorSystem.shutdown()
4041
}
4142

4243
test("master register shuffle and fetch") {
@@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
5657
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
5758
(BlockManagerId("b", "hostB", 1000), size10000)))
5859
tracker.stop()
60+
actorSystem.shutdown()
5961
}
6062

6163
test("master register and unregister shuffle") {
@@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
7476
tracker.unregisterShuffle(10)
7577
assert(!tracker.containsShuffle(10))
7678
assert(tracker.getServerStatuses(10, 0).isEmpty)
79+
80+
tracker.stop()
81+
actorSystem.shutdown()
7782
}
7883

7984
test("master register shuffle and unregister map output and fetch") {
@@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
97102
// this should cause it to fail, and the scheduler will ignore the failure due to the
98103
// stage already being aborted.
99104
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
105+
106+
tracker.stop()
107+
actorSystem.shutdown()
100108
}
101109

102110
test("remote fetch") {
@@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
136144

137145
// failure should be cached
138146
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
147+
148+
masterTracker.stop()
149+
slaveTracker.stop()
150+
actorSystem.shutdown()
151+
slaveSystem.shutdown()
139152
}
140153

141154
test("remote fetch below akka frame size") {
@@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
154167
masterTracker.registerMapOutput(10, 0, MapStatus(
155168
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
156169
masterActor.receive(GetMapOutputStatuses(10))
170+
171+
// masterTracker.stop() // this throws an exception
172+
actorSystem.shutdown()
157173
}
158174

159175
test("remote fetch exceeds akka frame size") {
@@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
176192
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
177193
}
178194
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
195+
196+
// masterTracker.stop() // this throws an exception
197+
actorSystem.shutdown()
179198
}
180199
}

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@
1717

1818
package org.apache.spark
1919

20-
import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
20+
import org.scalatest.{FunSuite, PrivateMethodTester}
2121

2222
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
2323
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
2424
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
2525
import org.apache.spark.scheduler.local.LocalBackend
2626

2727
class SparkContextSchedulerCreationSuite
28-
extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
28+
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
2929

3030
def createTaskScheduler(master: String): TaskSchedulerImpl = {
3131
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
3232
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
33-
val sc = new SparkContext("local", "test")
33+
sc = new SparkContext("local", "test")
3434
val createTaskSchedulerMethod =
3535
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
3636
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
145145
outputStream.register()
146146

147147
ssc.start()
148-
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
149-
assertChannelIsEmpty(channel)
150-
assertChannelIsEmpty(channel2)
151-
sink.stop()
152-
channel.stop()
148+
try {
149+
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
150+
assertChannelIsEmpty(channel)
151+
assertChannelIsEmpty(channel2)
152+
} finally {
153+
sink.stop()
154+
sink2.stop()
155+
channel.stop()
156+
channel2.stop()
157+
}
153158
}
154159

155160
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,

network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
5858
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
5959

6060
private final Class<? extends Channel> socketChannelClass;
61-
private final EventLoopGroup workerGroup;
61+
private EventLoopGroup workerGroup;
6262

6363
public TransportClientFactory(TransportContext context) {
6464
this.context = context;
@@ -150,6 +150,7 @@ public void close() {
150150

151151
if (workerGroup != null) {
152152
workerGroup.shutdownGracefully();
153+
workerGroup = null;
153154
}
154155
}
155156

network/common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
4949
private ChannelFuture channelFuture;
5050
private int port = -1;
5151

52+
/** Creates a TransportServer that binds to the given port, or to any available if 0. */
5253
public TransportServer(TransportContext context, int portToBind) {
5354
this.context = context;
5455
this.conf = context.getConf();
@@ -67,7 +68,7 @@ private void init(int portToBind) {
6768

6869
IOMode ioMode = IOMode.valueOf(conf.ioMode());
6970
EventLoopGroup bossGroup =
70-
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
71+
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
7172
EventLoopGroup workerGroup = bossGroup;
7273

7374
bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
105106
@Override
106107
public void close() {
107108
if (channelFuture != null) {
108-
// close is a local operation and should finish with milliseconds; timeout just to be safe
109+
// close is a local operation and should finish within milliseconds; timeout just to be safe
109110
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
110111
channelFuture = null;
111112
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20+
import java.io.Closeable;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

@@ -85,4 +87,9 @@ public void registerWithShuffleServer(
8587
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
8688
client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
8789
}
90+
91+
@Override
92+
public void close() {
93+
clientFactory.close();
94+
}
8895
}

0 commit comments

Comments
 (0)