From 492c06ad0c50fd9f4d2280058be076e092d35619 Mon Sep 17 00:00:00 2001 From: Maciej Kisiel Date: Thu, 23 Jul 2015 13:47:24 -0700 Subject: [PATCH] [SPARK-5269] [CORE] Use a SerializerInstance pool in BlockManager. --- .../apache/spark/storage/BlockManager.scala | 65 +++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d31aa68eb695..6fb23acebbc5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,9 +20,12 @@ package org.apache.spark.storage import java.io._ import java.nio.{ByteBuffer, MappedByteBuffer} +import com.twitter.chill.ResourcePool + import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{ExecutionContext, Await, Future} import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.control.NonFatal import scala.util.Random @@ -37,7 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv -import org.apache.spark.serializer.{SerializerInstance, Serializer} +import org.apache.spark.serializer.{SerializationStream, DeserializationStream, SerializerInstance, Serializer} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util._ @@ -53,6 +56,56 @@ private[spark] class BlockResult( val readMethod: DataReadMethod.Value, val bytes: Long) +private[storage] class BlockManagerSerializerPool(serializer: Serializer, size: Int) + extends ResourcePool[SerializerInstance](size) { + override def newInstance(): SerializerInstance = serializer.newInstance() +} + +private[storage] class BlockManagerSerializerInstance(serializer: Serializer) + extends SerializerInstance { + @transient lazy val instancePool = new BlockManagerSerializerPool(serializer, 30) + + private[this] def acquireRelease[O](fn: SerializerInstance => O): O = { + val serializerInstance = instancePool.borrow + try { + fn(serializerInstance) + } finally { + instancePool.release(serializerInstance) + } + } + + def serialize[T: ClassTag](o: T): ByteBuffer = + acquireRelease { serializerInstance => + serializerInstance.serialize(o) + } + + def deserialize[T: ClassTag](bytes: ByteBuffer): T = + acquireRelease { serializerInstance => + serializerInstance.deserialize[T](bytes) + } + + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = + acquireRelease { serializerInstance => + serializerInstance.deserialize(bytes, loader) + } + + override def serializeStream(s: OutputStream): SerializationStream = + acquireRelease { serializerInstance => + serializerInstance.serializeStream(s) + } + + override def deserializeStream(s: InputStream): DeserializationStream = + acquireRelease { serializerInstance => + serializerInstance.deserializeStream(s) + } +} + +private[storage] class BlockManagerSerializer(serializer: Serializer) extends Serializer { + val blockManagerSerializerInstance = new BlockManagerSerializerInstance(serializer) + + override def newInstance(): SerializerInstance = blockManagerSerializerInstance +} + /** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). @@ -77,6 +130,8 @@ private[spark] class BlockManager( private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] + private val pooledDefaultSerializer = new BlockManagerSerializer(defaultSerializer) + private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) @@ -1211,7 +1266,7 @@ private[spark] class BlockManager( blockId: BlockId, outputStream: OutputStream, values: Iterator[Any], - serializer: Serializer = defaultSerializer): Unit = { + serializer: Serializer = pooledDefaultSerializer): Unit = { val byteStream = new BufferedOutputStream(outputStream) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() @@ -1221,7 +1276,7 @@ private[spark] class BlockManager( def dataSerialize( blockId: BlockId, values: Iterator[Any], - serializer: Serializer = defaultSerializer): ByteBuffer = { + serializer: Serializer = pooledDefaultSerializer): ByteBuffer = { val byteStream = new ByteArrayOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) ByteBuffer.wrap(byteStream.toByteArray) @@ -1234,7 +1289,7 @@ private[spark] class BlockManager( def dataDeserialize( blockId: BlockId, bytes: ByteBuffer, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + serializer: Serializer = pooledDefaultSerializer): Iterator[Any] = { bytes.rewind() dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) } @@ -1246,7 +1301,7 @@ private[spark] class BlockManager( def dataDeserializeStream( blockId: BlockId, inputStream: InputStream, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + serializer: Serializer = pooledDefaultSerializer): Iterator[Any] = { val stream = new BufferedInputStream(inputStream) serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator }