From 5200ef0e7e55be698ed388c4501f9472d8f29673 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Oct 2017 13:46:48 +0000 Subject: [PATCH 1/2] Simplify dictionary initialization and remove unused method. --- .../execution/columnar/ColumnAccessor.scala | 3 -- .../compression/compressionSchemes.scala | 33 +++++-------------- 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index 24c8ac81420c..83df33617583 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -63,9 +63,6 @@ private[columnar] abstract class BasicColumnAccessor[JvmType]( } protected def underlyingBuffer = buffer - - def getByteBuffer: ByteBuffer = - buffer.duplicate.order(ByteOrder.nativeOrder()) } private[columnar] class NullColumnAccessor(buffer: ByteBuffer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index bf00ad997c76..88c07afb33ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -452,31 +452,10 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) extends compression.Decoder[T] { - val elementNum = ByteBufferHelper.getInt(buffer) - private val dictionary: Array[Any] = new Array[Any](elementNum) - private var intDictionary: Array[Int] = null - private var longDictionary: Array[Long] = null - - columnType.dataType match { - case _: IntegerType => - intDictionary = new Array[Int](elementNum) - for (i <- 0 until elementNum) { - val v = columnType.extract(buffer).asInstanceOf[Int] - intDictionary(i) = v - dictionary(i) = v - } - case _: LongType => - longDictionary = new Array[Long](elementNum) - for (i <- 0 until elementNum) { - val v = columnType.extract(buffer).asInstanceOf[Long] - longDictionary(i) = v - dictionary(i) = v - } - case _: StringType => - for (i <- 0 until elementNum) { - val v = columnType.extract(buffer).asInstanceOf[Any] - dictionary(i) = v - } + + private val dictionary: Array[Any] = { + val elementNum = ByteBufferHelper.getInt(buffer) + Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) } override def next(row: InternalRow, ordinal: Int): Unit = { @@ -495,6 +474,8 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { columnType.dataType match { case _: IntegerType => val dictionaryIds = columnVector.reserveDictionaryIds(capacity) + val intDictionary = dictionary.map(_.asInstanceOf[Int]) + columnVector.setDictionary(new ColumnDictionary(intDictionary)) while (pos < capacity) { if (pos != nextNullIndex) { @@ -508,6 +489,8 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } case _: LongType => val dictionaryIds = columnVector.reserveDictionaryIds(capacity) + val longDictionary = dictionary.map(_.asInstanceOf[Long]) + columnVector.setDictionary(new ColumnDictionary(longDictionary)) while (pos < capacity) { if (pos != nextNullIndex) { From 25003cfd324071fed5eacc0fde6420f81516bcea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 19 Oct 2017 02:30:28 +0000 Subject: [PATCH 2/2] Revert dictionary change for avoiding boxing. --- .../compression/compressionSchemes.scala | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index 88c07afb33ea..bf00ad997c76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -452,10 +452,31 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) extends compression.Decoder[T] { - - private val dictionary: Array[Any] = { - val elementNum = ByteBufferHelper.getInt(buffer) - Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) + val elementNum = ByteBufferHelper.getInt(buffer) + private val dictionary: Array[Any] = new Array[Any](elementNum) + private var intDictionary: Array[Int] = null + private var longDictionary: Array[Long] = null + + columnType.dataType match { + case _: IntegerType => + intDictionary = new Array[Int](elementNum) + for (i <- 0 until elementNum) { + val v = columnType.extract(buffer).asInstanceOf[Int] + intDictionary(i) = v + dictionary(i) = v + } + case _: LongType => + longDictionary = new Array[Long](elementNum) + for (i <- 0 until elementNum) { + val v = columnType.extract(buffer).asInstanceOf[Long] + longDictionary(i) = v + dictionary(i) = v + } + case _: StringType => + for (i <- 0 until elementNum) { + val v = columnType.extract(buffer).asInstanceOf[Any] + dictionary(i) = v + } } override def next(row: InternalRow, ordinal: Int): Unit = { @@ -474,8 +495,6 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { columnType.dataType match { case _: IntegerType => val dictionaryIds = columnVector.reserveDictionaryIds(capacity) - val intDictionary = dictionary.map(_.asInstanceOf[Int]) - columnVector.setDictionary(new ColumnDictionary(intDictionary)) while (pos < capacity) { if (pos != nextNullIndex) { @@ -489,8 +508,6 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } case _: LongType => val dictionaryIds = columnVector.reserveDictionaryIds(capacity) - val longDictionary = dictionary.map(_.asInstanceOf[Long]) - columnVector.setDictionary(new ColumnDictionary(longDictionary)) while (pos < capacity) { if (pos != nextNullIndex) {