Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 2 additions & 34 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
remoteId_ : ConnectionManagerId)
extends Connection(SocketChannel.open, selector_, remoteId_) {

private class Outbox(fair: Int = 0) {
private class Outbox {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
var nextMessageToBeUsed = 0
Expand All @@ -186,38 +186,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}

def getChunk(): Option[MessageChunk] = {
fair match {
case 0 => getChunkFIFO()
case 1 => getChunkRR()
case _ => throw new Exception("Unexpected fairness policy in outbox")
}
}

private def getChunkFIFO(): Option[MessageChunk] = {
/*logInfo("Using FIFO")*/
messages.synchronized {
while (!messages.isEmpty) {
val message = messages(0)
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
if (!message.started) {
logDebug("Starting to send [" + message + "]")
message.started = true
message.startTime = System.currentTimeMillis
}
return chunk
} else {
message.finishTime = System.currentTimeMillis
logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
"] in " + message.timeTaken )
}
}
}
None
}

private def getChunkRR(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
Expand Down Expand Up @@ -249,7 +217,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

// outbox is used as a lock - ensure that it is always used as a leaf (since methods which
// lock it are invoked in context of other locks)
private val outbox = new Outbox(1)
private val outbox = new Outbox()
/*
This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
different purpose. This flag is to see if we need to force reregister for write even when we
Expand Down