diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 3b6298a26d7c5..5285ec82c1b64 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -17,11 +17,6 @@ package org.apache.spark.network -import org.apache.spark._ -import org.apache.spark.SparkSaslServer - -import scala.collection.mutable.{HashMap, Queue, ArrayBuffer} - import java.net._ import java.nio._ import java.nio.channels._ @@ -41,7 +36,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = { this(channel_, selector_, ConnectionManagerId.fromSocketAddress( - channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_) + channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_) } channel.configureBlocking(false) @@ -89,7 +84,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, private def disposeSasl() { if (sparkSaslServer != null) { - sparkSaslServer.dispose(); + sparkSaslServer.dispose() } if (sparkSaslClient != null) { @@ -328,15 +323,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) callOnExceptionCallback(e) - // ignore - return true } } + true } override def write(): Boolean = { @@ -546,7 +539,7 @@ private[spark] class ReceivingConnection( /* println("Filled buffer at " + System.currentTimeMillis) */ val bufferMessage = inbox.getMessageForChunk(currentChunk).get if (bufferMessage.isCompletelyReceived) { - bufferMessage.flip + bufferMessage.flip() bufferMessage.finishTime = System.currentTimeMillis logDebug("Finished receiving [" + bufferMessage + "] from " + "[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index cf1c985c2fff9..8a1cdb812962e 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, def run() { try { while(!selectorThread.isInterrupted) { - while (! registerRequests.isEmpty) { + while (!registerRequests.isEmpty) { val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) conn.connect() @@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() - while (allKeys.hasNext()) { + while (allKeys.hasNext) { val key = allKeys.next() try { if (! key.isValid) { @@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (0 != selectedKeysCount) { val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { + while (selectedKeys.hasNext) { val key = selectedKeys.next selectedKeys.remove() try { @@ -419,62 +419,63 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, connectionsByKey -= connection.key try { - if (connection.isInstanceOf[SendingConnection]) { - val sendingConnection = connection.asInstanceOf[SendingConnection] - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - logInfo("Removing SendingConnection to " + sendingConnectionManagerId) - - connectionsById -= sendingConnectionManagerId - connectionsAwaitingSasl -= connection.connectionId + connection match { + case sendingConnection: SendingConnection => + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + logInfo("Removing SendingConnection to " + sendingConnectionManagerId) + + connectionsById -= sendingConnectionManagerId + connectionsAwaitingSasl -= connection.connectionId + + messageStatuses.synchronized { + messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId) + .foreach(status => { + logInfo("Notifying " + status) + status.synchronized { + status.attempted = true + status.acked = false + status.markDone() + } + }) - messageStatuses.synchronized { - messageStatuses - .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { - logInfo("Notifying " + status) - status.synchronized { - status.attempted = true - status.acked = false - status.markDone() - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId }) + } + case receivingConnection: ReceivingConnection => + val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() + logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } - } else if (connection.isInstanceOf[ReceivingConnection]) { - val receivingConnection = connection.asInstanceOf[ReceivingConnection] - val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() - logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - - val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) - if (! sendingConnectionOpt.isDefined) { - logError("Corresponding SendingConnectionManagerId not found") - return - } + val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) + if (!sendingConnectionOpt.isDefined) { + logError("Corresponding SendingConnectionManagerId not found") + return + } - val sendingConnection = sendingConnectionOpt.get - connectionsById -= remoteConnectionManagerId - sendingConnection.close() + val sendingConnection = sendingConnectionOpt.get + connectionsById -= remoteConnectionManagerId + sendingConnection.close() - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - assert (sendingConnectionManagerId == remoteConnectionManagerId) + assert(sendingConnectionManagerId == remoteConnectionManagerId) - messageStatuses.synchronized { - for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { - logInfo("Notifying " + s) - s.synchronized { - s.attempted = true - s.acked = false - s.markDone() + messageStatuses.synchronized { + for (s <- messageStatuses.values + if s.connectionManagerId == sendingConnectionManagerId) { + logInfo("Notifying " + s) + s.synchronized { + s.attempted = true + s.acked = false + s.markDone() + } } - } - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId + }) + } + case _ => logError("Unsupported type of connection.") } } finally { // So that the selection keys can be removed. @@ -517,13 +518,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logDebug("Client sasl completed for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId waitingConn.getAuthenticated().synchronized { - waitingConn.getAuthenticated().notifyAll(); + waitingConn.getAuthenticated().notifyAll() } return } else { var replyToken : Array[Byte] = null try { - replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken); + replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken) if (waitingConn.isSaslComplete()) { logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId @@ -533,7 +534,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, return } val securityMsgResp = SecurityMessage.fromResponse(replyToken, - securityMsg.getConnectionId.toString()) + securityMsg.getConnectionId.toString) val message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security message") sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) @@ -630,13 +631,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case bufferMessage: BufferMessage => { if (authEnabled) { val res = handleAuthentication(connection, bufferMessage) - if (res == true) { + if (res) { // message was security negotiation so skip the rest logDebug("After handleAuth result was true, returning") return } } - if (bufferMessage.hasAckId) { + if (bufferMessage.hasAckId()) { val sentMessageStatus = messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { @@ -646,7 +647,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case None => { throw new Exception("Could not find reference for received ack message " + message.id) - null } } } @@ -668,7 +668,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (ackMessage.isDefined) { if (!ackMessage.get.isInstanceOf[BufferMessage]) { logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " - + ackMessage.get.getClass()) + + ackMessage.get.getClass) } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { logDebug("Response to " + bufferMessage + " does not have ack id set") ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala index b82edb6850d23..57f7586883af1 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala @@ -32,6 +32,6 @@ private[spark] case class ConnectionManagerId(host: String, port: Int) { private[spark] object ConnectionManagerId { def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = { - new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort()) + new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort) } }