@@ -41,12 +41,18 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
4141 /**
4242 * Port number the service is listening on, available only after [[init ]] is invoked.
4343 */
44- override def port : Int = cm.id.port
44+ override def port : Int = {
45+ checkInit()
46+ cm.id.port
47+ }
4548
4649 /**
4750 * Host name the service is listening on, available only after [[init ]] is invoked.
4851 */
49- override def hostName : String = cm.id.host
52+ override def hostName : String = {
53+ checkInit()
54+ cm.id.host
55+ }
5056
5157 /**
5258 * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -76,6 +82,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
7682 port : Int ,
7783 blockIds : Seq [String ],
7884 listener : BlockFetchingListener ): Unit = {
85+ checkInit()
7986
8087 val cmId = new ConnectionManagerId (hostName, port)
8188 val blockMessageArray = new BlockMessageArray (blockIds.map { blockId =>
@@ -118,6 +125,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
118125 blockId : String ,
119126 blockData : ManagedBuffer ,
120127 level : StorageLevel ) {
128+ checkInit()
121129 val msg = PutBlock (BlockId (blockId), blockData.byteBuffer(), level)
122130 val blockMessageArray = new BlockMessageArray (BlockMessage .fromPutBlock(msg))
123131 val remoteCmId = new ConnectionManagerId (hostName, port)
@@ -127,6 +135,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
127135 Duration .Inf )
128136 }
129137
138+ private def checkInit (): Unit = if (cm == null ) {
139+ throw new IllegalStateException (getClass.getName + " has not been initialized" )
140+ }
141+
130142 private def onBlockMessageReceive (msg : Message , id : ConnectionManagerId ): Option [Message ] = {
131143 logDebug(" Handling message " + msg)
132144 msg match {
0 commit comments