Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)

private[sql] object ColumnAccessor {
def apply(buffer: ByteBuffer): ColumnAccessor = {
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()
val columnTypeId = dup.getInt()

columnTypeId match {
case INT.typeId => new IntColumnAccessor(buffer)
case LONG.typeId => new LongColumnAccessor(buffer)
case FLOAT.typeId => new FloatColumnAccessor(buffer)
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
case BYTE.typeId => new ByteColumnAccessor(buffer)
case SHORT.typeId => new ShortColumnAccessor(buffer)
case STRING.typeId => new StringColumnAccessor(buffer)
case BINARY.typeId => new BinaryColumnAccessor(buffer)
case GENERIC.typeId => new GenericColumnAccessor(buffer)
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._

/**
* Used to collect statistical information when building in-memory columns.
*
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
* brings significant performance penalty.
*/
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
/**
* Closed lower bound of this column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])

protected def isWorthCompressing(encoder: Encoder) = {
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
}

Expand All @@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

abstract override def build() = {
val rawBuffer = super.build()
val encoder = {
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}

private[sql] trait Encoder {
def gatherCompressibilityStats[T <: NativeType](
value: T#JvmType,
columnType: ColumnType[T, T#JvmType]) {}
private[sql] trait Encoder[T <: NativeType] {
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}

def compressedSize: Int

Expand All @@ -35,10 +33,7 @@ private[sql] trait Encoder {
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
}

def compress[T <: NativeType](
from: ByteBuffer,
to: ByteBuffer,
columnType: ColumnType[T, T#JvmType]): ByteBuffer
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
}

private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
Expand All @@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {

def supports(columnType: ColumnType[_, _]): Boolean

def encoder: Encoder
def encoder[T <: NativeType]: Encoder[T]

def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
}
Expand All @@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
}

private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
override val schemes: Seq[CompressionScheme] = {
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
}
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
}

private[sql] object CompressionScheme {
def apply(typeId: Int): CompressionScheme = typeId match {
case PassThrough.typeId => PassThrough
case _ => throw new UnsupportedOperationException()
val all: Seq[CompressionScheme] =
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)

private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap

def apply(typeId: Int): CompressionScheme = {
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
s"Unrecognized compression scheme type ID: $typeId"))
}

def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
Expand Down
Loading