From 85b24f70eae2a0ff06874a9b1f656a75550f04a9 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Fri, 20 Jun 2014 11:30:57 -0700 Subject: [PATCH 1/2] Cleanup on Connection and ConnectionManager classes part 2 while I was working at the code there to help IDE: 1. Remove unused imports 2. Remove parentheses in method calls that do not have side affect. 3. Add parentheses in method calls that do have side effect. 4. Change if-else check (via isInstanceOf) for Connection class type with Scala expression for consitency and cleanliness. 5. Remove semicolon 6. Remove extra spaces. --- .../org/apache/spark/network/Connection.scala | 15 +-- .../spark/network/ConnectionManager.scala | 105 +++++++++--------- .../spark/network/ConnectionManagerId.scala | 2 +- 3 files changed, 57 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 3b6298a26d7c5..5285ec82c1b64 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -17,11 +17,6 @@ package org.apache.spark.network -import org.apache.spark._ -import org.apache.spark.SparkSaslServer - -import scala.collection.mutable.{HashMap, Queue, ArrayBuffer} - import java.net._ import java.nio._ import java.nio.channels._ @@ -41,7 +36,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = { this(channel_, selector_, ConnectionManagerId.fromSocketAddress( - channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_) + channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_) } channel.configureBlocking(false) @@ -89,7 +84,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, private def disposeSasl() { if (sparkSaslServer != null) { - sparkSaslServer.dispose(); + sparkSaslServer.dispose() } if (sparkSaslClient != null) { @@ -328,15 +323,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) callOnExceptionCallback(e) - // ignore - return true } } + true } override def write(): Boolean = { @@ -546,7 +539,7 @@ private[spark] class ReceivingConnection( /* println("Filled buffer at " + System.currentTimeMillis) */ val bufferMessage = inbox.getMessageForChunk(currentChunk).get if (bufferMessage.isCompletelyReceived) { - bufferMessage.flip + bufferMessage.flip() bufferMessage.finishTime = System.currentTimeMillis logDebug("Finished receiving [" + bufferMessage + "] from " + "[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index cf1c985c2fff9..8eda120d70f26 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, def run() { try { while(!selectorThread.isInterrupted) { - while (! registerRequests.isEmpty) { + while (!registerRequests.isEmpty) { val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) conn.connect() @@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() - while (allKeys.hasNext()) { + while (allKeys.hasNext) { val key = allKeys.next() try { if (! key.isValid) { @@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (0 != selectedKeysCount) { val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { + while (selectedKeys.hasNext) { val key = selectedKeys.next selectedKeys.remove() try { @@ -419,62 +419,62 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, connectionsByKey -= connection.key try { - if (connection.isInstanceOf[SendingConnection]) { - val sendingConnection = connection.asInstanceOf[SendingConnection] - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - logInfo("Removing SendingConnection to " + sendingConnectionManagerId) + connection match { + case sendingConnection: SendingConnection => + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + logInfo("Removing SendingConnection to " + sendingConnectionManagerId) - connectionsById -= sendingConnectionManagerId - connectionsAwaitingSasl -= connection.connectionId + connectionsById -= sendingConnectionManagerId + connectionsAwaitingSasl -= connection.connectionId - messageStatuses.synchronized { - messageStatuses - .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { + messageStatuses.synchronized { + messageStatuses + .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { logInfo("Notifying " + status) status.synchronized { - status.attempted = true - status.acked = false - status.markDone() + status.attempted = true + status.acked = false + status.markDone() } }) - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } - } else if (connection.isInstanceOf[ReceivingConnection]) { - val receivingConnection = connection.asInstanceOf[ReceivingConnection] - val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() - logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - - val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) - if (! sendingConnectionOpt.isDefined) { - logError("Corresponding SendingConnectionManagerId not found") - return - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId + }) + } + case receivingConnection: ReceivingConnection => + val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() + logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - val sendingConnection = sendingConnectionOpt.get - connectionsById -= remoteConnectionManagerId - sendingConnection.close() + val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) + if (!sendingConnectionOpt.isDefined) { + logError("Corresponding SendingConnectionManagerId not found") + return + } - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + val sendingConnection = sendingConnectionOpt.get + connectionsById -= remoteConnectionManagerId + sendingConnection.close() - assert (sendingConnectionManagerId == remoteConnectionManagerId) + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - messageStatuses.synchronized { - for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { - logInfo("Notifying " + s) - s.synchronized { - s.attempted = true - s.acked = false - s.markDone() + assert(sendingConnectionManagerId == remoteConnectionManagerId) + + messageStatuses.synchronized { + for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { + logInfo("Notifying " + s) + s.synchronized { + s.attempted = true + s.acked = false + s.markDone() + } } - } - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId + }) + } + case _ => logError("Unsupported type of connection.") } } finally { // So that the selection keys can be removed. @@ -517,13 +517,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logDebug("Client sasl completed for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId waitingConn.getAuthenticated().synchronized { - waitingConn.getAuthenticated().notifyAll(); + waitingConn.getAuthenticated().notifyAll() } return } else { var replyToken : Array[Byte] = null try { - replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken); + replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken) if (waitingConn.isSaslComplete()) { logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId @@ -533,7 +533,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, return } val securityMsgResp = SecurityMessage.fromResponse(replyToken, - securityMsg.getConnectionId.toString()) + securityMsg.getConnectionId.toString) val message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security message") sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) @@ -630,13 +630,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case bufferMessage: BufferMessage => { if (authEnabled) { val res = handleAuthentication(connection, bufferMessage) - if (res == true) { + if (res) { // message was security negotiation so skip the rest logDebug("After handleAuth result was true, returning") return } } - if (bufferMessage.hasAckId) { + if (bufferMessage.hasAckId()) { val sentMessageStatus = messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { @@ -646,7 +646,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case None => { throw new Exception("Could not find reference for received ack message " + message.id) - null } } } @@ -668,7 +667,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (ackMessage.isDefined) { if (!ackMessage.get.isInstanceOf[BufferMessage]) { logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " - + ackMessage.get.getClass()) + + ackMessage.get.getClass) } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { logDebug("Response to " + bufferMessage + " does not have ack id set") ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala index b82edb6850d23..57f7586883af1 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala @@ -32,6 +32,6 @@ private[spark] case class ConnectionManagerId(host: String, port: Int) { private[spark] object ConnectionManagerId { def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = { - new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort()) + new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort) } } From 4be6906946e4c8138f5f68685e99d26480265d8b Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Mon, 23 Jun 2014 11:44:06 -0700 Subject: [PATCH 2/2] Fix Spark Scala style for line over 100 chars. --- .../spark/network/ConnectionManager.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 8eda120d70f26..8a1cdb812962e 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -428,15 +428,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, connectionsAwaitingSasl -= connection.connectionId messageStatuses.synchronized { - messageStatuses - .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { - logInfo("Notifying " + status) - status.synchronized { - status.attempted = true - status.acked = false - status.markDone() - } - }) + messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId) + .foreach(status => { + logInfo("Notifying " + status) + status.synchronized { + status.attempted = true + status.acked = false + status.markDone() + } + }) messageStatuses.retain((i, status) => { status.connectionManagerId != sendingConnectionManagerId @@ -461,7 +461,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, assert(sendingConnectionManagerId == remoteConnectionManagerId) messageStatuses.synchronized { - for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { + for (s <- messageStatuses.values + if s.connectionManagerId == sendingConnectionManagerId) { logInfo("Notifying " + s) s.synchronized { s.attempted = true