Skip to content

Commit 44fe4b2

Browse files
committed
Refactored CompressionScheme, added 3 more compression schemes.
New schemes: BooleanBitSet, IntDelta and LongDelta
1 parent 1347ebd commit 44fe4b2

File tree

6 files changed

+275
-106
lines changed

6 files changed

+275
-106
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
2020
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.catalyst.types._
2222

23+
/**
24+
* Used to collect statistical information when building in-memory columns.
25+
*
26+
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
27+
* brings significant performance penalty.
28+
*/
2329
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
2430
/**
2531
* Closed lower bound of this column.

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
4747

4848
import CompressionScheme._
4949

50-
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
50+
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
5151

52-
protected def isWorthCompressing(encoder: Encoder) = {
52+
protected def isWorthCompressing(encoder: Encoder[T]) = {
5353
encoder.compressionRatio < 0.8
5454
}
5555

@@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
7070

7171
abstract override def build() = {
7272
val rawBuffer = super.build()
73-
val encoder = {
73+
val encoder: Encoder[T] = {
7474
val candidate = compressionEncoders.minBy(_.compressionRatio)
7575
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
7676
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ import java.nio.ByteBuffer
2222
import org.apache.spark.sql.catalyst.types.NativeType
2323
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
2424

25-
private[sql] trait Encoder {
26-
def gatherCompressibilityStats[T <: NativeType](
27-
value: T#JvmType,
28-
columnType: ColumnType[T, T#JvmType]) {}
25+
private[sql] trait Encoder[T <: NativeType] {
26+
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}
2927

3028
def compressedSize: Int
3129

@@ -35,10 +33,7 @@ private[sql] trait Encoder {
3533
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
3634
}
3735

38-
def compress[T <: NativeType](
39-
from: ByteBuffer,
40-
to: ByteBuffer,
41-
columnType: ColumnType[T, T#JvmType]): ByteBuffer
36+
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
4237
}
4338

4439
private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
@@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {
4843

4944
def supports(columnType: ColumnType[_, _]): Boolean
5045

51-
def encoder: Encoder
46+
def encoder[T <: NativeType]: Encoder[T]
5247

5348
def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
5449
}
@@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
5853
}
5954

6055
private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
61-
override val schemes: Seq[CompressionScheme] = {
62-
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
63-
}
56+
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
6457
}
6558

6659
private[sql] object CompressionScheme {
67-
def apply(typeId: Int): CompressionScheme = typeId match {
68-
case PassThrough.typeId => PassThrough
69-
case _ => throw new UnsupportedOperationException()
60+
val all: Seq[CompressionScheme] =
61+
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)
62+
63+
private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap
64+
65+
def apply(typeId: Int): CompressionScheme = {
66+
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
67+
s"Unrecognized compression scheme type ID: $typeId"))
7068
}
7169

7270
def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {

0 commit comments

Comments
 (0)