Skip to content

Commit 07ccf0d

Browse files
committed
Added init check to CMBlockTransferService.
1 parent 98c668a commit 07ccf0d

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,18 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
4141
/**
4242
* Port number the service is listening on, available only after [[init]] is invoked.
4343
*/
44-
override def port: Int = cm.id.port
44+
override def port: Int = {
45+
checkInit()
46+
cm.id.port
47+
}
4548

4649
/**
4750
* Host name the service is listening on, available only after [[init]] is invoked.
4851
*/
49-
override def hostName: String = cm.id.host
52+
override def hostName: String = {
53+
checkInit()
54+
cm.id.host
55+
}
5056

5157
/**
5258
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -76,6 +82,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
7682
port: Int,
7783
blockIds: Seq[String],
7884
listener: BlockFetchingListener): Unit = {
85+
checkInit()
7986

8087
val cmId = new ConnectionManagerId(hostName, port)
8188
val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
@@ -118,6 +125,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
118125
blockId: String,
119126
blockData: ManagedBuffer,
120127
level: StorageLevel) {
128+
checkInit()
121129
val msg = PutBlock(BlockId(blockId), blockData.byteBuffer(), level)
122130
val blockMessageArray = new BlockMessageArray(BlockMessage.fromPutBlock(msg))
123131
val remoteCmId = new ConnectionManagerId(hostName, port)
@@ -127,6 +135,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
127135
Duration.Inf)
128136
}
129137

138+
private def checkInit(): Unit = if (cm == null) {
139+
throw new IllegalStateException(getClass.getName + " has not been initialized")
140+
}
141+
130142
private def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
131143
logDebug("Handling message " + msg)
132144
msg match {

0 commit comments

Comments
 (0)