@@ -20,9 +20,12 @@ package org.apache.spark.storage
2020import java .io ._
2121import java .nio .{ByteBuffer , MappedByteBuffer }
2222
23+ import com .twitter .chill .ResourcePool
24+
2325import scala .collection .mutable .{ArrayBuffer , HashMap }
2426import scala .concurrent .{ExecutionContext , Await , Future }
2527import scala .concurrent .duration ._
28+ import scala .reflect .ClassTag
2629import scala .util .control .NonFatal
2730import scala .util .Random
2831
@@ -37,7 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf
3740import org .apache .spark .network .shuffle .ExternalShuffleClient
3841import org .apache .spark .network .shuffle .protocol .ExecutorShuffleInfo
3942import org .apache .spark .rpc .RpcEnv
40- import org .apache .spark .serializer .{SerializerInstance , Serializer }
43+ import org .apache .spark .serializer .{SerializationStream , DeserializationStream , SerializerInstance , Serializer }
4144import org .apache .spark .shuffle .ShuffleManager
4245import org .apache .spark .shuffle .hash .HashShuffleManager
4346import org .apache .spark .util ._
@@ -53,6 +56,56 @@ private[spark] class BlockResult(
5356 val readMethod : DataReadMethod .Value ,
5457 val bytes : Long )
5558
59+ private [storage] class BlockManagerSerializerPool (serializer : Serializer , size : Int )
60+ extends ResourcePool [SerializerInstance ](size) {
61+ override def newInstance (): SerializerInstance = serializer.newInstance()
62+ }
63+
64+ private [storage] class BlockManagerSerializerInstance (serializer : Serializer )
65+ extends SerializerInstance {
66+ @ transient lazy val instancePool = new BlockManagerSerializerPool (serializer, 30 )
67+
68+ private [this ] def acquireRelease [O ](fn : SerializerInstance => O ): O = {
69+ val serializerInstance = instancePool.borrow
70+ try {
71+ fn(serializerInstance)
72+ } finally {
73+ instancePool.release(serializerInstance)
74+ }
75+ }
76+
77+ def serialize [T : ClassTag ](o : T ): ByteBuffer =
78+ acquireRelease { serializerInstance =>
79+ serializerInstance.serialize(o)
80+ }
81+
82+ def deserialize [T : ClassTag ](bytes : ByteBuffer ): T =
83+ acquireRelease { serializerInstance =>
84+ serializerInstance.deserialize[T ](bytes)
85+ }
86+
87+ override def deserialize [T : ClassTag ](bytes : ByteBuffer , loader : ClassLoader ): T =
88+ acquireRelease { serializerInstance =>
89+ serializerInstance.deserialize(bytes, loader)
90+ }
91+
92+ override def serializeStream (s : OutputStream ): SerializationStream =
93+ acquireRelease { serializerInstance =>
94+ serializerInstance.serializeStream(s)
95+ }
96+
97+ override def deserializeStream (s : InputStream ): DeserializationStream =
98+ acquireRelease { serializerInstance =>
99+ serializerInstance.deserializeStream(s)
100+ }
101+ }
102+
103+ private [storage] class BlockManagerSerializer (serializer : Serializer ) extends Serializer {
104+ val blockManagerSerializerInstance = new BlockManagerSerializerInstance (serializer)
105+
106+ override def newInstance (): SerializerInstance = blockManagerSerializerInstance
107+ }
108+
56109/**
57110 * Manager running on every node (driver and executors) which provides interfaces for putting and
58111 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -77,6 +130,8 @@ private[spark] class BlockManager(
77130
78131 private val blockInfo = new TimeStampedHashMap [BlockId , BlockInfo ]
79132
133+ private val pooledDefaultSerializer = new BlockManagerSerializer (defaultSerializer)
134+
80135 private val futureExecutionContext = ExecutionContext .fromExecutorService(
81136 ThreadUtils .newDaemonCachedThreadPool(" block-manager-future" , 128 ))
82137
@@ -1211,7 +1266,7 @@ private[spark] class BlockManager(
12111266 blockId : BlockId ,
12121267 outputStream : OutputStream ,
12131268 values : Iterator [Any ],
1214- serializer : Serializer = defaultSerializer ): Unit = {
1269+ serializer : Serializer = pooledDefaultSerializer ): Unit = {
12151270 val byteStream = new BufferedOutputStream (outputStream)
12161271 val ser = serializer.newInstance()
12171272 ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
@@ -1221,7 +1276,7 @@ private[spark] class BlockManager(
12211276 def dataSerialize (
12221277 blockId : BlockId ,
12231278 values : Iterator [Any ],
1224- serializer : Serializer = defaultSerializer ): ByteBuffer = {
1279+ serializer : Serializer = pooledDefaultSerializer ): ByteBuffer = {
12251280 val byteStream = new ByteArrayOutputStream (4096 )
12261281 dataSerializeStream(blockId, byteStream, values, serializer)
12271282 ByteBuffer .wrap(byteStream.toByteArray)
@@ -1234,7 +1289,7 @@ private[spark] class BlockManager(
12341289 def dataDeserialize (
12351290 blockId : BlockId ,
12361291 bytes : ByteBuffer ,
1237- serializer : Serializer = defaultSerializer ): Iterator [Any ] = {
1292+ serializer : Serializer = pooledDefaultSerializer ): Iterator [Any ] = {
12381293 bytes.rewind()
12391294 dataDeserializeStream(blockId, new ByteBufferInputStream (bytes, true ), serializer)
12401295 }
@@ -1246,7 +1301,7 @@ private[spark] class BlockManager(
12461301 def dataDeserializeStream (
12471302 blockId : BlockId ,
12481303 inputStream : InputStream ,
1249- serializer : Serializer = defaultSerializer ): Iterator [Any ] = {
1304+ serializer : Serializer = pooledDefaultSerializer ): Iterator [Any ] = {
12501305 val stream = new BufferedInputStream (inputStream)
12511306 serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
12521307 }
0 commit comments