@@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
249249 def run () {
250250 try {
251251 while (! selectorThread.isInterrupted) {
252- while (! registerRequests.isEmpty) {
252+ while (! registerRequests.isEmpty) {
253253 val conn : SendingConnection = registerRequests.dequeue()
254254 addListeners(conn)
255255 conn.connect()
@@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
308308 // Some keys within the selectors list are invalid/closed. clear them.
309309 val allKeys = selector.keys().iterator()
310310
311- while (allKeys.hasNext() ) {
311+ while (allKeys.hasNext) {
312312 val key = allKeys.next()
313313 try {
314314 if (! key.isValid) {
@@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
341341
342342 if (0 != selectedKeysCount) {
343343 val selectedKeys = selector.selectedKeys().iterator()
344- while (selectedKeys.hasNext() ) {
344+ while (selectedKeys.hasNext) {
345345 val key = selectedKeys.next
346346 selectedKeys.remove()
347347 try {
@@ -419,62 +419,63 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
419419 connectionsByKey -= connection.key
420420
421421 try {
422- if (connection.isInstanceOf [SendingConnection ]) {
423- val sendingConnection = connection.asInstanceOf [SendingConnection ]
424- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425- logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
426-
427- connectionsById -= sendingConnectionManagerId
428- connectionsAwaitingSasl -= connection.connectionId
422+ connection match {
423+ case sendingConnection : SendingConnection =>
424+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425+ logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
426+
427+ connectionsById -= sendingConnectionManagerId
428+ connectionsAwaitingSasl -= connection.connectionId
429+
430+ messageStatuses.synchronized {
431+ messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId)
432+ .foreach(status => {
433+ logInfo(" Notifying " + status)
434+ status.synchronized {
435+ status.attempted = true
436+ status.acked = false
437+ status.markDone()
438+ }
439+ })
429440
430- messageStatuses.synchronized {
431- messageStatuses
432- .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
433- logInfo(" Notifying " + status)
434- status.synchronized {
435- status.attempted = true
436- status.acked = false
437- status.markDone()
438- }
441+ messageStatuses.retain((i, status) => {
442+ status.connectionManagerId != sendingConnectionManagerId
439443 })
444+ }
445+ case receivingConnection : ReceivingConnection =>
446+ val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
447+ logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
440448
441- messageStatuses.retain((i, status) => {
442- status.connectionManagerId != sendingConnectionManagerId
443- })
444- }
445- } else if (connection.isInstanceOf [ReceivingConnection ]) {
446- val receivingConnection = connection.asInstanceOf [ReceivingConnection ]
447- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
448- logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
449-
450- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
451- if (! sendingConnectionOpt.isDefined) {
452- logError(" Corresponding SendingConnectionManagerId not found" )
453- return
454- }
449+ val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
450+ if (! sendingConnectionOpt.isDefined) {
451+ logError(" Corresponding SendingConnectionManagerId not found" )
452+ return
453+ }
455454
456- val sendingConnection = sendingConnectionOpt.get
457- connectionsById -= remoteConnectionManagerId
458- sendingConnection.close()
455+ val sendingConnection = sendingConnectionOpt.get
456+ connectionsById -= remoteConnectionManagerId
457+ sendingConnection.close()
459458
460- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
459+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
461460
462- assert (sendingConnectionManagerId == remoteConnectionManagerId)
461+ assert (sendingConnectionManagerId == remoteConnectionManagerId)
463462
464- messageStatuses.synchronized {
465- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
466- logInfo(" Notifying " + s)
467- s.synchronized {
468- s.attempted = true
469- s.acked = false
470- s.markDone()
463+ messageStatuses.synchronized {
464+ for (s <- messageStatuses.values
465+ if s.connectionManagerId == sendingConnectionManagerId) {
466+ logInfo(" Notifying " + s)
467+ s.synchronized {
468+ s.attempted = true
469+ s.acked = false
470+ s.markDone()
471+ }
471472 }
472- }
473473
474- messageStatuses.retain((i, status) => {
475- status.connectionManagerId != sendingConnectionManagerId
476- })
477- }
474+ messageStatuses.retain((i, status) => {
475+ status.connectionManagerId != sendingConnectionManagerId
476+ })
477+ }
478+ case _ => logError(" Unsupported type of connection." )
478479 }
479480 } finally {
480481 // So that the selection keys can be removed.
@@ -517,13 +518,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
517518 logDebug(" Client sasl completed for id: " + waitingConn.connectionId)
518519 connectionsAwaitingSasl -= waitingConn.connectionId
519520 waitingConn.getAuthenticated().synchronized {
520- waitingConn.getAuthenticated().notifyAll();
521+ waitingConn.getAuthenticated().notifyAll()
521522 }
522523 return
523524 } else {
524525 var replyToken : Array [Byte ] = null
525526 try {
526- replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
527+ replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
527528 if (waitingConn.isSaslComplete()) {
528529 logDebug(" Client sasl completed after evaluate for id: " + waitingConn.connectionId)
529530 connectionsAwaitingSasl -= waitingConn.connectionId
@@ -533,7 +534,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
533534 return
534535 }
535536 val securityMsgResp = SecurityMessage .fromResponse(replyToken,
536- securityMsg.getConnectionId.toString() )
537+ securityMsg.getConnectionId.toString)
537538 val message = securityMsgResp.toBufferMessage
538539 if (message == null ) throw new Exception (" Error creating security message" )
539540 sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
@@ -630,13 +631,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
630631 case bufferMessage : BufferMessage => {
631632 if (authEnabled) {
632633 val res = handleAuthentication(connection, bufferMessage)
633- if (res == true ) {
634+ if (res) {
634635 // message was security negotiation so skip the rest
635636 logDebug(" After handleAuth result was true, returning" )
636637 return
637638 }
638639 }
639- if (bufferMessage.hasAckId) {
640+ if (bufferMessage.hasAckId() ) {
640641 val sentMessageStatus = messageStatuses.synchronized {
641642 messageStatuses.get(bufferMessage.ackId) match {
642643 case Some (status) => {
@@ -646,7 +647,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
646647 case None => {
647648 throw new Exception (" Could not find reference for received ack message " +
648649 message.id)
649- null
650650 }
651651 }
652652 }
@@ -668,7 +668,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
668668 if (ackMessage.isDefined) {
669669 if (! ackMessage.get.isInstanceOf [BufferMessage ]) {
670670 logDebug(" Response to " + bufferMessage + " is not a buffer message, it is of type "
671- + ackMessage.get.getClass() )
671+ + ackMessage.get.getClass)
672672 } else if (! ackMessage.get.asInstanceOf [BufferMessage ].hasAckId) {
673673 logDebug(" Response to " + bufferMessage + " does not have ack id set" )
674674 ackMessage.get.asInstanceOf [BufferMessage ].ackId = bufferMessage.id
0 commit comments