From 053bc59fda89370e067e7f83afb572e6bb131c7b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 3 Mar 2014 16:01:10 -0800 Subject: [PATCH] Remove broken/unused Connection.getChunkFIFO method. This method appears to be broken -- since it never removes anything from messages, and it adds new messages to it, the while loop is an infinite loop. The method also does not appear to have ever been used since the code was added in 2012, so this commit removes it. --- .../org/apache/spark/network/Connection.scala | 36 ++----------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index f2e3c1a14ecc6..8219a185ea983 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -171,7 +171,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, remoteId_ : ConnectionManagerId) extends Connection(SocketChannel.open, selector_, remoteId_) { - private class Outbox(fair: Int = 0) { + private class Outbox { val messages = new Queue[Message]() val defaultChunkSize = 65536 //32768 //16384 var nextMessageToBeUsed = 0 @@ -186,38 +186,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } def getChunk(): Option[MessageChunk] = { - fair match { - case 0 => getChunkFIFO() - case 1 => getChunkRR() - case _ => throw new Exception("Unexpected fairness policy in outbox") - } - } - - private def getChunkFIFO(): Option[MessageChunk] = { - /*logInfo("Using FIFO")*/ - messages.synchronized { - while (!messages.isEmpty) { - val message = messages(0) - val chunk = message.getChunkForSending(defaultChunkSize) - if (chunk.isDefined) { - messages += message // this is probably incorrect, it wont work as fifo - if (!message.started) { - logDebug("Starting to send [" + message + "]") - message.started = true - message.startTime = System.currentTimeMillis - } - return chunk - } else { - message.finishTime = System.currentTimeMillis - logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + - "] in " + message.timeTaken ) - } - } - } - None - } - - private def getChunkRR(): Option[MessageChunk] = { messages.synchronized { while (!messages.isEmpty) { /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ @@ -249,7 +217,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // outbox is used as a lock - ensure that it is always used as a leaf (since methods which // lock it are invoked in context of other locks) - private val outbox = new Outbox(1) + private val outbox = new Outbox() /* This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose. This flag is to see if we need to force reregister for write even when we