Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()

@volatile private var isActive = true
private val selectorThread = new Thread("connection-manager-thread") {
override def run(): Unit = ConnectionManager.this.run()
}
Expand Down Expand Up @@ -342,7 +343,7 @@ private[nio] class ConnectionManager(

def run() {
try {
while(!selectorThread.isInterrupted) {
while (isActive) {
while (!registerRequests.isEmpty) {
val conn: SendingConnection = registerRequests.dequeue()
addListeners(conn)
Expand Down Expand Up @@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
} catch {
// Explicitly only dealing with CancelledKeyException here since other exceptions
// should be dealt with differently.
case e: CancelledKeyException => {
case e: CancelledKeyException =>
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()

Expand All @@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
}
}
}
}
0
0

case e: ClosedSelectorException =>
logDebug("Failed select() as selector is closed.", e)
return
}

if (selectedKeysCount == 0) {
Expand Down Expand Up @@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
}

def stop() {
isActive = false
ackTimeoutMonitor.stop()
selector.wakeup()
selector.close()
selectorThread.interrupt()
selectorThread.join()
selector.close()
val connections = connectionsByKey.values
connections.foreach(_.close())
if (connectionsByKey.size != 0) {
Expand Down