@@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
249249 def run () {
250250 try {
251251 while (! selectorThread.isInterrupted) {
252- while (! registerRequests.isEmpty) {
252+ while (! registerRequests.isEmpty) {
253253 val conn : SendingConnection = registerRequests.dequeue()
254254 addListeners(conn)
255255 conn.connect()
@@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
308308 // Some keys within the selectors list are invalid/closed. clear them.
309309 val allKeys = selector.keys().iterator()
310310
311- while (allKeys.hasNext() ) {
311+ while (allKeys.hasNext) {
312312 val key = allKeys.next()
313313 try {
314314 if (! key.isValid) {
@@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
341341
342342 if (0 != selectedKeysCount) {
343343 val selectedKeys = selector.selectedKeys().iterator()
344- while (selectedKeys.hasNext() ) {
344+ while (selectedKeys.hasNext) {
345345 val key = selectedKeys.next
346346 selectedKeys.remove()
347347 try {
@@ -419,62 +419,62 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
419419 connectionsByKey -= connection.key
420420
421421 try {
422- if ( connection. isInstanceOf [ SendingConnection ]) {
423- val sendingConnection = connection. asInstanceOf [ SendingConnection ]
424- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425- logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
422+ connection match {
423+ case sendingConnection : SendingConnection =>
424+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
425+ logInfo(" Removing SendingConnection to " + sendingConnectionManagerId)
426426
427- connectionsById -= sendingConnectionManagerId
428- connectionsAwaitingSasl -= connection.connectionId
427+ connectionsById -= sendingConnectionManagerId
428+ connectionsAwaitingSasl -= connection.connectionId
429429
430- messageStatuses.synchronized {
431- messageStatuses
432- .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
430+ messageStatuses.synchronized {
431+ messageStatuses
432+ .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
433433 logInfo(" Notifying " + status)
434434 status.synchronized {
435- status.attempted = true
436- status.acked = false
437- status.markDone()
435+ status.attempted = true
436+ status.acked = false
437+ status.markDone()
438438 }
439439 })
440440
441- messageStatuses.retain((i, status) => {
442- status.connectionManagerId != sendingConnectionManagerId
443- })
444- }
445- } else if (connection.isInstanceOf [ReceivingConnection ]) {
446- val receivingConnection = connection.asInstanceOf [ReceivingConnection ]
447- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
448- logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
449-
450- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
451- if (! sendingConnectionOpt.isDefined) {
452- logError(" Corresponding SendingConnectionManagerId not found" )
453- return
454- }
441+ messageStatuses.retain((i, status) => {
442+ status.connectionManagerId != sendingConnectionManagerId
443+ })
444+ }
445+ case receivingConnection : ReceivingConnection =>
446+ val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
447+ logInfo(" Removing ReceivingConnection to " + remoteConnectionManagerId)
455448
456- val sendingConnection = sendingConnectionOpt.get
457- connectionsById -= remoteConnectionManagerId
458- sendingConnection.close()
449+ val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
450+ if (! sendingConnectionOpt.isDefined) {
451+ logError(" Corresponding SendingConnectionManagerId not found" )
452+ return
453+ }
459454
460- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
455+ val sendingConnection = sendingConnectionOpt.get
456+ connectionsById -= remoteConnectionManagerId
457+ sendingConnection.close()
461458
462- assert ( sendingConnectionManagerId == remoteConnectionManagerId )
459+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId( )
463460
464- messageStatuses.synchronized {
465- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
466- logInfo(" Notifying " + s)
467- s.synchronized {
468- s.attempted = true
469- s.acked = false
470- s.markDone()
461+ assert(sendingConnectionManagerId == remoteConnectionManagerId)
462+
463+ messageStatuses.synchronized {
464+ for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
465+ logInfo(" Notifying " + s)
466+ s.synchronized {
467+ s.attempted = true
468+ s.acked = false
469+ s.markDone()
470+ }
471471 }
472- }
473472
474- messageStatuses.retain((i, status) => {
475- status.connectionManagerId != sendingConnectionManagerId
476- })
477- }
473+ messageStatuses.retain((i, status) => {
474+ status.connectionManagerId != sendingConnectionManagerId
475+ })
476+ }
477+ case _ => logError(" Unsupported type of connection." )
478478 }
479479 } finally {
480480 // So that the selection keys can be removed.
@@ -517,13 +517,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
517517 logDebug(" Client sasl completed for id: " + waitingConn.connectionId)
518518 connectionsAwaitingSasl -= waitingConn.connectionId
519519 waitingConn.getAuthenticated().synchronized {
520- waitingConn.getAuthenticated().notifyAll();
520+ waitingConn.getAuthenticated().notifyAll()
521521 }
522522 return
523523 } else {
524524 var replyToken : Array [Byte ] = null
525525 try {
526- replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
526+ replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
527527 if (waitingConn.isSaslComplete()) {
528528 logDebug(" Client sasl completed after evaluate for id: " + waitingConn.connectionId)
529529 connectionsAwaitingSasl -= waitingConn.connectionId
@@ -533,7 +533,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
533533 return
534534 }
535535 val securityMsgResp = SecurityMessage .fromResponse(replyToken,
536- securityMsg.getConnectionId.toString() )
536+ securityMsg.getConnectionId.toString)
537537 val message = securityMsgResp.toBufferMessage
538538 if (message == null ) throw new Exception (" Error creating security message" )
539539 sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
@@ -630,13 +630,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
630630 case bufferMessage : BufferMessage => {
631631 if (authEnabled) {
632632 val res = handleAuthentication(connection, bufferMessage)
633- if (res == true ) {
633+ if (res) {
634634 // message was security negotiation so skip the rest
635635 logDebug(" After handleAuth result was true, returning" )
636636 return
637637 }
638638 }
639- if (bufferMessage.hasAckId) {
639+ if (bufferMessage.hasAckId() ) {
640640 val sentMessageStatus = messageStatuses.synchronized {
641641 messageStatuses.get(bufferMessage.ackId) match {
642642 case Some (status) => {
@@ -646,7 +646,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
646646 case None => {
647647 throw new Exception (" Could not find reference for received ack message " +
648648 message.id)
649- null
650649 }
651650 }
652651 }
@@ -668,7 +667,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
668667 if (ackMessage.isDefined) {
669668 if (! ackMessage.get.isInstanceOf [BufferMessage ]) {
670669 logDebug(" Response to " + bufferMessage + " is not a buffer message, it is of type "
671- + ackMessage.get.getClass() )
670+ + ackMessage.get.getClass)
672671 } else if (! ackMessage.get.asInstanceOf [BufferMessage ].hasAckId) {
673672 logDebug(" Response to " + bufferMessage + " does not have ack id set" )
674673 ackMessage.get.asInstanceOf [BufferMessage ].ackId = bufferMessage.id
0 commit comments