Skip to content

Commit 327ebf0

Browse files
Marcelo Vanzinsrowen
authored andcommitted
[core] [minor] Make sure ConnectionManager stops.
My previous fix (force a selector wakeup) didn't seem to work since I ran into the hang again. So change the code a bit to be more explicit about the condition when the selector thread should exit. Author: Marcelo Vanzin <[email protected]> Closes apache#5566 from vanzin/conn-mgr-hang and squashes the following commits: ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops.
1 parent 5f095d5 commit 327ebf0

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
188188
private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
189189
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
190190

191+
@volatile private var isActive = true
191192
private val selectorThread = new Thread("connection-manager-thread") {
192193
override def run(): Unit = ConnectionManager.this.run()
193194
}
@@ -342,7 +343,7 @@ private[nio] class ConnectionManager(
342343

343344
def run() {
344345
try {
345-
while(!selectorThread.isInterrupted) {
346+
while (isActive) {
346347
while (!registerRequests.isEmpty) {
347348
val conn: SendingConnection = registerRequests.dequeue()
348349
addListeners(conn)
@@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
398399
} catch {
399400
// Explicitly only dealing with CancelledKeyException here since other exceptions
400401
// should be dealt with differently.
401-
case e: CancelledKeyException => {
402+
case e: CancelledKeyException =>
402403
// Some keys within the selectors list are invalid/closed. clear them.
403404
val allKeys = selector.keys().iterator()
404405

@@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
420421
}
421422
}
422423
}
423-
}
424-
0
424+
0
425+
426+
case e: ClosedSelectorException =>
427+
logDebug("Failed select() as selector is closed.", e)
428+
return
425429
}
426430

427431
if (selectedKeysCount == 0) {
@@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
988992
}
989993

990994
def stop() {
995+
isActive = false
991996
ackTimeoutMonitor.stop()
992-
selector.wakeup()
997+
selector.close()
993998
selectorThread.interrupt()
994999
selectorThread.join()
995-
selector.close()
9961000
val connections = connectionsByKey.values
9971001
connections.foreach(_.close())
9981002
if (connectionsByKey.size != 0) {

0 commit comments

Comments
 (0)