Skip to content

Commit 8a1046e

Browse files
committed
Code review feedback:
1. Rename package name from cm to nio. 2. Refined BlockTransferService and ManagedBuffer interfaces.
1 parent 2c6b1e1 commit 8a1046e

26 files changed

+141
-127
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
3232
import org.apache.spark.broadcast.BroadcastManager
3333
import org.apache.spark.metrics.MetricsSystem
3434
import org.apache.spark.network.BlockTransferService
35-
import org.apache.spark.network.cm.CMBlockTransferService
35+
import org.apache.spark.network.nio.NioBlockTransferService
3636
import org.apache.spark.scheduler.LiveListenerBus
3737
import org.apache.spark.serializer.Serializer
3838
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
@@ -226,7 +226,7 @@ object SparkEnv extends Logging {
226226

227227
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
228228

229-
val blockTransferService = new CMBlockTransferService(conf, securityManager)
229+
val blockTransferService = new NioBlockTransferService(conf, securityManager)
230230

231231
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
232232
"BlockManagerMaster",

core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trait BlockFetchingListener extends EventListener {
3131
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
3232

3333
/**
34-
* Called upon failures.
34+
* Called upon failures. For each failure, this is called only once (i.e. not once per block).
3535
*/
3636
def onBlockFetchFailure(exception: Throwable): Unit
3737
}

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.network
1919

20+
import scala.concurrent.{Await, Future}
21+
import scala.concurrent.duration.Duration
22+
2023
import org.apache.spark.storage.StorageLevel
2124

2225

@@ -48,9 +51,11 @@ abstract class BlockTransferService {
4851
* available only after [[init]] is invoked.
4952
*
5053
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
51-
* while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
54+
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
5255
*
53-
* This takes a sequence so the implementation can batch requests.
56+
* Note that this API takes a sequence so the implementation can batch requests, and does not
57+
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
58+
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
5459
*/
5560
def fetchBlocks(
5661
hostName: String,
@@ -59,12 +64,21 @@ abstract class BlockTransferService {
5964
listener: BlockFetchingListener): Unit
6065

6166
/**
62-
* Fetch a single block from a remote node, synchronously,
63-
* available only after [[init]] is invoked.
67+
* Upload a single block to a remote node, available only after [[init]] is invoked.
6468
*/
65-
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
66-
// TODO(rxin): Add timeout?
69+
def uploadBlock(
70+
hostname: String,
71+
port: Int,
72+
blockId: String,
73+
blockData: ManagedBuffer,
74+
level: StorageLevel): Future[Unit]
6775

76+
/**
77+
* A special case of [[fetchBlocks]], since it only fetches on block and is blocking.
78+
*
79+
* It is also only available after [[init]] is invoked.
80+
*/
81+
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
6882
// A monitor for the thread to wait on.
6983
val lock = new Object
7084
@volatile var result: Either[ManagedBuffer, Throwable] = null
@@ -103,12 +117,15 @@ abstract class BlockTransferService {
103117
/**
104118
* Upload a single block to a remote node, available only after [[init]] is invoked.
105119
*
106-
* This call blocks until the upload completes, or throws an exception upon failures.
120+
* This method is similar to [[uploadBlock]], except this one blocks the thread
121+
* until the upload finishes.
107122
*/
108-
def uploadBlock(
123+
def uploadBlockSync(
109124
hostname: String,
110125
port: Int,
111126
blockId: String,
112127
blockData: ManagedBuffer,
113-
level: StorageLevel): Unit
128+
level: StorageLevel): Unit = {
129+
Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)
130+
}
114131
}

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,85 +17,90 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
20+
import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
2121
import java.nio.ByteBuffer
2222
import java.nio.channels.FileChannel.MapMode
2323

24-
import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
25-
import io.netty.channel.DefaultFileRegion
24+
import io.netty.buffer.{ByteBufInputStream, ByteBuf}
2625

27-
import org.apache.spark.storage.FileSegment
2826
import org.apache.spark.util.ByteBufferInputStream
2927

3028

3129
/**
32-
* Provides a buffer abstraction that allows pooling and reuse.
30+
* This interface provides an immutable view for data in the form of bytes. The implementation
31+
* should specify how the data is provided:
32+
*
33+
* - FileSegmentManagedBuffer: data backed by part of a file
34+
* - NioByteBufferManagedBuffer: data backed by a NIO ByteBuffer
35+
* - NettyByteBufManagedBuffer: data backed by a Netty ByteBuf
3336
*/
34-
abstract class ManagedBuffer {
37+
sealed abstract class ManagedBuffer {
3538
// Note that all the methods are defined with parenthesis because their implementations can
3639
// have side effects (io operations).
3740

38-
def byteBuffer(): ByteBuffer
39-
40-
def fileSegment(): Option[FileSegment] = None
41-
42-
def inputStream(): InputStream = throw new UnsupportedOperationException
43-
44-
def release(): Unit = throw new UnsupportedOperationException
45-
41+
/** Number of bytes of the data. */
4642
def size: Long
4743

48-
private[network] def toNetty(): AnyRef
44+
/**
45+
* Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
46+
* returned ByteBuffer should not affect the content of this buffer.
47+
*/
48+
def nioByteBuffer(): ByteBuffer
49+
50+
/**
51+
* Exposes this buffer's data as an InputStream. The underlying implementation does not
52+
* necessarily check for the length of bytes read, so the caller is responsible for making sure
53+
* it does not go over the limit.
54+
*/
55+
def inputStream(): InputStream
4956
}
5057

5158

5259
/**
53-
* A ManagedBuffer backed by a segment in a file.
60+
* A [[ManagedBuffer]] backed by a segment in a file
5461
*/
55-
final class FileSegmentManagedBuffer(file: File, offset: Long, length: Long)
62+
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
5663
extends ManagedBuffer {
5764

5865
override def size: Long = length
5966

60-
override def byteBuffer(): ByteBuffer = {
67+
override def nioByteBuffer(): ByteBuffer = {
6168
val channel = new RandomAccessFile(file, "r").getChannel
6269
channel.map(MapMode.READ_ONLY, offset, length)
6370
}
6471

65-
override private[network] def toNetty(): AnyRef = {
66-
val fileChannel = new FileInputStream(file).getChannel
67-
new DefaultFileRegion(fileChannel, offset, length)
72+
override def inputStream(): InputStream = {
73+
val is = new FileInputStream(file)
74+
is.skip(offset)
75+
is
6876
}
6977
}
7078

7179

7280
/**
73-
* A ManagedBuffer backed by [[java.nio.ByteBuffer]].
81+
* A [[ManagedBuffer]] backed by [[java.nio.ByteBuffer]].
7482
*/
7583
final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
7684

77-
override def byteBuffer() = buf
78-
79-
override def inputStream() = new ByteBufferInputStream(buf)
80-
8185
override def size: Long = buf.remaining()
8286

83-
override private[network] def toNetty(): AnyRef = Unpooled.wrappedBuffer(buf)
87+
override def nioByteBuffer() = buf
88+
89+
override def inputStream() = new ByteBufferInputStream(buf)
8490
}
8591

8692

8793
/**
88-
* A ManagedBuffer backed by a Netty [[ByteBuf]].
94+
* A [[ManagedBuffer]] backed by a Netty [[ByteBuf]].
8995
*/
9096
final class NettyByteBufManagedBuffer(buf: ByteBuf) extends ManagedBuffer {
9197

92-
override def byteBuffer() = buf.nioBuffer()
93-
94-
override def inputStream() = new ByteBufInputStream(buf)
98+
override def size: Long = buf.readableBytes()
9599

96-
override def release(): Unit = buf.release()
100+
override def nioByteBuffer() = buf.nioBuffer()
97101

98-
override def size: Long = buf.readableBytes()
102+
override def inputStream() = new ByteBufInputStream(buf)
99103

100-
override private[network] def toNetty(): AnyRef = buf
104+
// TODO(rxin): Promote this to top level ManagedBuffer interface and add documentation for it.
105+
def release(): Unit = buf.release()
101106
}

core/src/main/scala/org/apache/spark/network/cm/BlockMessage.scala renamed to core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

2020
import java.nio.ByteBuffer
2121

2222
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
2323

2424
import scala.collection.mutable.{ArrayBuffer, StringBuilder}
2525

26+
// private[spark] because we need to register them in Kryo
2627
private[spark] case class GetBlock(id: BlockId)
2728
private[spark] case class GotBlock(id: BlockId, data: ByteBuffer)
2829
private[spark] case class PutBlock(id: BlockId, data: ByteBuffer, level: StorageLevel)
2930

30-
private[spark] class BlockMessage() {
31+
private[nio] class BlockMessage() {
3132
// Un-initialized: typ = 0
3233
// GetBlock: typ = 1
3334
// GotBlock: typ = 2
@@ -158,7 +159,7 @@ private[spark] class BlockMessage() {
158159
}
159160
}
160161

161-
private[spark] object BlockMessage {
162+
private[nio] object BlockMessage {
162163
val TYPE_NON_INITIALIZED: Int = 0
163164
val TYPE_GET_BLOCK: Int = 1
164165
val TYPE_GOT_BLOCK: Int = 2
@@ -193,16 +194,4 @@ private[spark] object BlockMessage {
193194
newBlockMessage.set(putBlock)
194195
newBlockMessage
195196
}
196-
197-
def main(args: Array[String]) {
198-
val B = new BlockMessage()
199-
val blockId = TestBlockId("ABC")
200-
B.set(new PutBlock(blockId, ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2))
201-
val bMsg = B.toBufferMessage
202-
val C = new BlockMessage()
203-
C.set(bMsg)
204-
205-
println(B.getId + " " + B.getLevel)
206-
println(C.getId + " " + C.getLevel)
207-
}
208197
}

core/src/main/scala/org/apache/spark/network/cm/BlockMessageArray.scala renamed to core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

2020
import java.nio.ByteBuffer
2121

@@ -24,7 +24,7 @@ import org.apache.spark.storage.{StorageLevel, TestBlockId}
2424

2525
import scala.collection.mutable.ArrayBuffer
2626

27-
private[spark]
27+
private[nio]
2828
class BlockMessageArray(var blockMessages: Seq[BlockMessage])
2929
extends Seq[BlockMessage] with Logging {
3030

@@ -102,7 +102,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage])
102102
}
103103
}
104104

105-
private[spark] object BlockMessageArray {
105+
private[nio] object BlockMessageArray {
106106

107107
def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = {
108108
val newBlockMessageArray = new BlockMessageArray()

core/src/main/scala/org/apache/spark/network/cm/BufferMessage.scala renamed to core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.collection.mutable.ArrayBuffer
23+
2224
import org.apache.spark.storage.BlockManager
2325

24-
import scala.collection.mutable.ArrayBuffer
2526

26-
private[spark]
27+
private[nio]
2728
class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
2829
extends Message(Message.BUFFER_MESSAGE, id_) {
2930

core/src/main/scala/org/apache/spark/network/cm/Connection.scala renamed to core/src/main/scala/org/apache/spark/network/nio/Connection.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

2020
import java.net._
2121
import java.nio._
@@ -25,7 +25,7 @@ import org.apache.spark._
2525

2626
import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}
2727

28-
private[spark]
28+
private[nio]
2929
abstract class Connection(val channel: SocketChannel, val selector: Selector,
3030
val socketRemoteConnectionManagerId: ConnectionManagerId, val connectionId: ConnectionId)
3131
extends Logging {
@@ -190,7 +190,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
190190
}
191191

192192

193-
private[spark]
193+
private[nio]
194194
class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
195195
remoteId_ : ConnectionManagerId, id_ : ConnectionId)
196196
extends Connection(SocketChannel.open, selector_, remoteId_, id_) {

core/src/main/scala/org/apache/spark/network/cm/ConnectionId.scala renamed to core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

20-
private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
20+
private[nio] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
2121
override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
2222
}
2323

24-
private[spark] object ConnectionId {
24+
private[nio] object ConnectionId {
2525

2626
def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
2727
val res = connectionIdString.split("_").map(_.trim())

core/src/main/scala/org/apache/spark/network/cm/ConnectionManager.scala renamed to core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.network.cm
18+
package org.apache.spark.network.nio
1919

2020
import java.io.IOException
2121
import java.net._
@@ -26,15 +26,16 @@ import java.util.concurrent.atomic.AtomicInteger
2626
import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
2727
import java.util.{Timer, TimerTask}
2828

29-
import org.apache.spark._
30-
import org.apache.spark.util.{SystemClock, Utils}
31-
3229
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, SynchronizedMap, SynchronizedQueue}
3330
import scala.concurrent.duration._
3431
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
3532
import scala.language.postfixOps
3633

37-
private[spark] class ConnectionManager(
34+
import org.apache.spark._
35+
import org.apache.spark.util.{SystemClock, Utils}
36+
37+
38+
private[nio] class ConnectionManager(
3839
port: Int,
3940
conf: SparkConf,
4041
securityManager: SecurityManager,

0 commit comments

Comments
 (0)