@@ -22,25 +22,27 @@ import java.nio.{ByteBuffer, ByteOrder}
2222import org .apache .spark .sql .Row
2323import org .apache .spark .sql .catalyst .types ._
2424import org .apache .spark .sql .columnar .ColumnBuilder ._
25- import org .apache .spark .sql .execution .SparkSqlSerializer
2625
2726private [sql] trait ColumnBuilder {
2827 /**
2928 * Initializes with an approximate lower bound on the expected number of elements in this column.
3029 */
3130 def initialize (initialSize : Int , columnName : String = " " )
3231
32+ def gatherStats (row : Row , ordinal : Int ) {}
33+
3334 def appendFrom (row : Row , ordinal : Int )
3435
3536 def build (): ByteBuffer
3637}
3738
38- private [sql] abstract class BasicColumnBuilder [T <: DataType , JvmType ] extends ColumnBuilder {
39+ private [sql] abstract class BasicColumnBuilder [T <: DataType , JvmType ](
40+ val columnType : ColumnType [T , JvmType ])
41+ extends ColumnBuilder {
3942
4043 private var columnName : String = _
41- protected var buffer : ByteBuffer = _
4244
43- def columnType : ColumnType [ T , JvmType ]
45+ protected var buffer : ByteBuffer = _
4446
4547 override def initialize (initialSize : Int , columnName : String = " " ) = {
4648 val size = if (initialSize == 0 ) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
@@ -49,18 +51,10 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
4951 buffer.order(ByteOrder .nativeOrder()).putInt(columnType.typeId)
5052 }
5153
52- // Have to give a concrete implementation to make mixin possible
5354 override def appendFrom (row : Row , ordinal : Int ) {
54- doAppendFrom(row, ordinal)
55- }
56-
57- // Concrete `ColumnBuilder`s can override this method to append values
58- protected def doAppendFrom (row : Row , ordinal : Int )
59-
60- // Helper method to append primitive values (to avoid boxing cost)
61- protected def appendValue (v : JvmType ) {
62- buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
63- columnType.append(v, buffer)
55+ val field = columnType.getField(row, ordinal)
56+ buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
57+ columnType.append(field, buffer)
6458 }
6559
6660 override def build () = {
@@ -70,82 +64,41 @@ private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends C
7064}
7165
7266private [sql] abstract class NativeColumnBuilder [T <: NativeType ](
73- val columnType : NativeColumnType [T ])
74- extends BasicColumnBuilder [T , T # JvmType ]
75- with NullableColumnBuilder
67+ protected val columnStats : ColumnStats [T ],
68+ columnType : NativeColumnType [T ])
69+ extends BasicColumnBuilder [T , T # JvmType ](columnType)
70+ with NullableColumnBuilder {
7671
77- private [sql] class BooleanColumnBuilder extends NativeColumnBuilder (BOOLEAN ) {
78- override def doAppendFrom (row : Row , ordinal : Int ) {
79- appendValue(row.getBoolean(ordinal))
72+ override def gatherStats (row : Row , ordinal : Int ) {
73+ columnStats.gatherStats(row, ordinal)
8074 }
8175}
8276
83- private [sql] class IntColumnBuilder extends NativeColumnBuilder (INT ) {
84- override def doAppendFrom (row : Row , ordinal : Int ) {
85- appendValue(row.getInt(ordinal))
86- }
87- }
77+ private [sql] class BooleanColumnBuilder
78+ extends NativeColumnBuilder (new BooleanColumnStats , BOOLEAN )
8879
89- private [sql] class ShortColumnBuilder extends NativeColumnBuilder (SHORT ) {
90- override def doAppendFrom (row : Row , ordinal : Int ) {
91- appendValue(row.getShort(ordinal))
92- }
93- }
80+ private [sql] class IntColumnBuilder extends NativeColumnBuilder (new IntColumnStats , INT )
9481
95- private [sql] class LongColumnBuilder extends NativeColumnBuilder (LONG ) {
96- override def doAppendFrom (row : Row , ordinal : Int ) {
97- appendValue(row.getLong(ordinal))
98- }
99- }
82+ private [sql] class ShortColumnBuilder extends NativeColumnBuilder (new ShortColumnStats , SHORT )
10083
101- private [sql] class ByteColumnBuilder extends NativeColumnBuilder (BYTE ) {
102- override def doAppendFrom (row : Row , ordinal : Int ) {
103- appendValue(row.getByte(ordinal))
104- }
105- }
84+ private [sql] class LongColumnBuilder extends NativeColumnBuilder (new LongColumnStats , LONG )
10685
107- private [sql] class DoubleColumnBuilder extends NativeColumnBuilder (DOUBLE ) {
108- override def doAppendFrom (row : Row , ordinal : Int ) {
109- appendValue(row.getDouble(ordinal))
110- }
111- }
86+ private [sql] class ByteColumnBuilder extends NativeColumnBuilder (new ByteColumnStats , BYTE )
11287
113- private [sql] class FloatColumnBuilder extends NativeColumnBuilder (FLOAT ) {
114- override def doAppendFrom (row : Row , ordinal : Int ) {
115- appendValue(row.getFloat(ordinal))
116- }
117- }
88+ private [sql] class DoubleColumnBuilder extends NativeColumnBuilder (new DoubleColumnStats , DOUBLE )
11889
119- private [sql] class StringColumnBuilder extends NativeColumnBuilder (STRING ) {
120- override def doAppendFrom (row : Row , ordinal : Int ) {
121- appendValue(row.getString(ordinal))
122- }
123- }
90+ private [sql] class FloatColumnBuilder extends NativeColumnBuilder (new FloatColumnStats , FLOAT )
12491
125- private [sql] class BinaryColumnBuilder
126- extends BasicColumnBuilder [BinaryType .type , Array [Byte ]]
127- with NullableColumnBuilder {
128-
129- def columnType = BINARY
92+ private [sql] class StringColumnBuilder extends NativeColumnBuilder (new StringColumnStates , STRING )
13093
131- override def doAppendFrom (row : Row , ordinal : Int ) {
132- appendValue(row(ordinal).asInstanceOf [Array [Byte ]])
133- }
134- }
94+ private [sql] class BinaryColumnBuilder
95+ extends BasicColumnBuilder [BinaryType .type , Array [Byte ]](BINARY )
96+ with NullableColumnBuilder
13597
13698// TODO (lian) Add support for array, struct and map
13799private [sql] class GenericColumnBuilder
138- extends BasicColumnBuilder [DataType , Array [Byte ]]
139- with NullableColumnBuilder {
140-
141- def columnType = GENERIC
142-
143- override def doAppendFrom (row : Row , ordinal : Int ) {
144- val serialized = SparkSqlSerializer .serialize(row(ordinal))
145- buffer = ColumnBuilder .ensureFreeSpace(buffer, columnType.actualSize(serialized))
146- columnType.append(serialized, buffer)
147- }
148- }
100+ extends BasicColumnBuilder [DataType , Array [Byte ]](GENERIC )
101+ with NullableColumnBuilder
149102
150103private [sql] object ColumnBuilder {
151104 val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
0 commit comments