@@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
188188 private val writeRunnableStarted : HashSet [SelectionKey ] = new HashSet [SelectionKey ]()
189189 private val readRunnableStarted : HashSet [SelectionKey ] = new HashSet [SelectionKey ]()
190190
191+ @ volatile private var isActive = true
191192 private val selectorThread = new Thread (" connection-manager-thread" ) {
192193 override def run (): Unit = ConnectionManager .this .run()
193194 }
@@ -342,7 +343,7 @@ private[nio] class ConnectionManager(
342343
343344 def run () {
344345 try {
345- while ( ! selectorThread.isInterrupted ) {
346+ while (isActive ) {
346347 while (! registerRequests.isEmpty) {
347348 val conn : SendingConnection = registerRequests.dequeue()
348349 addListeners(conn)
@@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
398399 } catch {
399400 // Explicitly only dealing with CancelledKeyException here since other exceptions
400401 // should be dealt with differently.
401- case e : CancelledKeyException => {
402+ case e : CancelledKeyException =>
402403 // Some keys within the selectors list are invalid/closed. clear them.
403404 val allKeys = selector.keys().iterator()
404405
@@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
420421 }
421422 }
422423 }
423- }
424- 0
424+ 0
425+
426+ case e : ClosedSelectorException =>
427+ logDebug(" Failed select() as selector is closed." , e)
428+ return
425429 }
426430
427431 if (selectedKeysCount == 0 ) {
@@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
988992 }
989993
990994 def stop () {
995+ isActive = false
991996 ackTimeoutMonitor.stop()
992- selector.wakeup ()
997+ selector.close ()
993998 selectorThread.interrupt()
994999 selectorThread.join()
995- selector.close()
9961000 val connections = connectionsByKey.values
9971001 connections.foreach(_.close())
9981002 if (connectionsByKey.size != 0 ) {
0 commit comments