Skip to content

Commit 3c1ad7a

Browse files
committed
Added test suite for BooleanBitSet, refactored other test suites
1 parent 44fe4b2 commit 3c1ad7a

File tree

5 files changed

+183
-72
lines changed

5 files changed

+183
-72
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
164164
override val typeId = 2
165165

166166
// 32K unique values allowed
167-
private val MAX_DICT_SIZE = Short.MaxValue - 1
167+
val MAX_DICT_SIZE = Short.MaxValue
168168

169169
override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
170170
new this.Decoder(buffer, columnType)
@@ -272,9 +272,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
272272
private[sql] case object BooleanBitSet extends CompressionScheme {
273273
override val typeId = 3
274274

275-
private val BITS_PER_LONG = 64
276-
277-
private var _uncompressedSize = 0
275+
val BITS_PER_LONG = 64
278276

279277
override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
280278
new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
@@ -285,11 +283,13 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
285283
override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN
286284

287285
class Encoder extends compression.Encoder[BooleanType.type] {
286+
private var _uncompressedSize = 0
287+
288288
override def gatherCompressibilityStats(
289289
value: Boolean,
290290
columnType: NativeColumnType[BooleanType.type]) {
291291

292-
_uncompressedSize += columnType.actualSize(value)
292+
_uncompressedSize += BOOLEAN.defaultSize
293293
}
294294

295295
override def compress(
@@ -301,7 +301,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
301301
// Total element count (1 byte per Boolean value)
302302
.putInt(from.remaining)
303303

304-
while (from.remaining > BITS_PER_LONG) {
304+
while (from.remaining >= BITS_PER_LONG) {
305305
var word = 0: Long
306306
var i = 0
307307

@@ -344,7 +344,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
344344
class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] {
345345
private val count = buffer.getInt()
346346

347-
private var currentWord = if (count > 0) buffer.getLong() else 0: Long
347+
private var currentWord = 0: Long
348348

349349
private var visited: Int = 0
350350

@@ -356,7 +356,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
356356
currentWord = buffer.getLong()
357357
}
358358

359-
((currentWord >> bit) & 1) > 0
359+
((currentWord >> bit) & 1) != 0
360360
}
361361

362362
override def hasNext: Boolean = visited < count
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.columnar.compression
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.columnar.{BOOLEAN, BooleanColumnStats}
24+
import org.apache.spark.sql.columnar.ColumnarTestUtils._
25+
26+
class BooleanBitSetSuite extends FunSuite {
27+
import BooleanBitSet._
28+
29+
def skeleton(count: Int) {
30+
// -------------
31+
// Tests encoder
32+
// -------------
33+
34+
val builder = TestCompressibleColumnBuilder(new BooleanColumnStats, BOOLEAN, BooleanBitSet)
35+
val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN))
36+
val values = rows.map(_.head)
37+
38+
rows.foreach(builder.appendFrom(_, 0))
39+
val buffer = builder.build()
40+
41+
// Column type ID + null count + null positions
42+
val headerSize = CompressionScheme.columnHeaderSize(buffer)
43+
44+
// Compression scheme ID + element count + bitset words
45+
val compressedSize = 4 + 4 + {
46+
val extra = if (count % BITS_PER_LONG == 0) 0 else 1
47+
(count / BITS_PER_LONG + extra) * 8
48+
}
49+
50+
// 4 extra bytes for compression scheme type ID
51+
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
52+
53+
// Skips column header
54+
buffer.position(headerSize)
55+
expectResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
56+
expectResult(count, "Wrong element count")(buffer.getInt())
57+
58+
var word = 0: Long
59+
for (i <- 0 until count) {
60+
val bit = i % BITS_PER_LONG
61+
word = if (bit == 0) buffer.getLong() else word
62+
expectResult(values(i), s"Wrong value in compressed buffer, index=$i") {
63+
(word & ((1: Long) << bit)) != 0
64+
}
65+
}
66+
67+
// -------------
68+
// Tests decoder
69+
// -------------
70+
71+
// Rewinds, skips column header and 4 more bytes for compression scheme ID
72+
buffer.rewind().position(headerSize + 4)
73+
74+
val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
75+
values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
76+
assert(!decoder.hasNext)
77+
}
78+
79+
test(s"$BooleanBitSet: empty") {
80+
skeleton(0)
81+
}
82+
83+
test(s"$BooleanBitSet: less than 1 word") {
84+
skeleton(BITS_PER_LONG - 1)
85+
}
86+
87+
test(s"$BooleanBitSet: exactly 1 word") {
88+
skeleton(BITS_PER_LONG)
89+
}
90+
91+
test(s"$BooleanBitSet: multiple whole words") {
92+
skeleton(BITS_PER_LONG * 2)
93+
}
94+
95+
test(s"$BooleanBitSet: multiple words and 1 more bit") {
96+
skeleton(BITS_PER_LONG * 2 + 1)
97+
}
98+
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.scalatest.FunSuite
2424
import org.apache.spark.sql.catalyst.types.NativeType
2525
import org.apache.spark.sql.columnar._
2626
import org.apache.spark.sql.columnar.ColumnarTestUtils._
27-
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
2827

2928
class DictionaryEncodingSuite extends FunSuite {
3029
testDictionaryEncoding(new IntColumnStats, INT)
@@ -41,73 +40,82 @@ class DictionaryEncodingSuite extends FunSuite {
4140
(0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap
4241
}
4342

44-
test(s"$DictionaryEncoding with $typeName: simple case") {
43+
def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
44+
Seq.empty
45+
} else {
46+
seq.head +: seq.tail.filterNot(_ == seq.head)
47+
}
48+
49+
def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
4550
// -------------
4651
// Tests encoder
4752
// -------------
4853

4954
val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding)
50-
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
51-
52-
builder.initialize(0)
53-
builder.appendFrom(rows(0), 0)
54-
builder.appendFrom(rows(1), 0)
55-
builder.appendFrom(rows(0), 0)
56-
builder.appendFrom(rows(1), 0)
57-
58-
val buffer = builder.build()
59-
val headerSize = CompressionScheme.columnHeaderSize(buffer)
60-
// 4 extra bytes for dictionary size
61-
val dictionarySize = 4 + values.map(columnType.actualSize).sum
62-
// 4 `Short`s, 2 bytes each
63-
val compressedSize = dictionarySize + 2 * 4
64-
// 4 extra bytes for compression scheme type ID
65-
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
66-
67-
// Skips column header
68-
buffer.position(headerSize)
69-
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
70-
71-
val dictionary = buildDictionary(buffer)
72-
Array[Short](0, 1).foreach { i =>
73-
expectResult(i, "Wrong dictionary entry")(dictionary(values(i)))
74-
}
75-
76-
Array[Short](0, 1, 0, 1).foreach {
77-
expectResult(_, "Wrong column element value")(buffer.getShort())
55+
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
56+
val dictValues = stableDistinct(inputSeq)
57+
58+
inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
59+
60+
if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
61+
withClue("Dictionary overflowed, compression should fail") {
62+
intercept[Throwable] {
63+
builder.build()
64+
}
65+
}
66+
} else {
67+
val buffer = builder.build()
68+
val headerSize = CompressionScheme.columnHeaderSize(buffer)
69+
// 4 extra bytes for dictionary size
70+
val dictionarySize = 4 + values.map(columnType.actualSize).sum
71+
// 2 bytes for each `Short`
72+
val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
73+
// 4 extra bytes for compression scheme type ID
74+
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
75+
76+
// Skips column header
77+
buffer.position(headerSize)
78+
expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
79+
80+
val dictionary = buildDictionary(buffer).toMap
81+
82+
dictValues.foreach { i =>
83+
expectResult(i, "Wrong dictionary entry") {
84+
dictionary(values(i))
85+
}
86+
}
87+
88+
inputSeq.foreach { i =>
89+
expectResult(i.toShort, "Wrong column element value")(buffer.getShort())
90+
}
91+
92+
// -------------
93+
// Tests decoder
94+
// -------------
95+
96+
// Rewinds, skips column header and 4 more bytes for compression scheme ID
97+
buffer.rewind().position(headerSize + 4)
98+
99+
val decoder = DictionaryEncoding.decoder(buffer, columnType)
100+
101+
inputSeq.foreach { i =>
102+
expectResult(values(i), "Wrong decoded value")(decoder.next())
103+
}
104+
105+
assert(!decoder.hasNext)
78106
}
79-
80-
// -------------
81-
// Tests decoder
82-
// -------------
83-
84-
// Rewinds, skips column header and 4 more bytes for compression scheme ID
85-
buffer.rewind().position(headerSize + 4)
86-
87-
val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType)
88-
89-
Array[Short](0, 1, 0, 1).foreach { i =>
90-
expectResult(values(i), "Wrong decoded value")(decoder.next())
91-
}
92-
93-
assert(!decoder.hasNext)
94107
}
95-
}
96108

97-
test(s"$DictionaryEncoding: overflow") {
98-
val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, DictionaryEncoding)
99-
builder.initialize(0)
109+
test(s"$DictionaryEncoding with $typeName: empty") {
110+
skeleton(0, Seq.empty)
111+
}
100112

101-
(0 to Short.MaxValue).foreach { n =>
102-
val row = new GenericMutableRow(1)
103-
row.setInt(0, n)
104-
builder.appendFrom(row, 0)
113+
test(s"$DictionaryEncoding with $typeName: simple case") {
114+
skeleton(2, Seq(0, 1, 0, 1))
105115
}
106116

107-
withClue("Dictionary overflowed, encoding should fail") {
108-
intercept[Throwable] {
109-
builder.build()
110-
}
117+
test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
118+
skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE)
111119
}
112120
}
113121
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ class RunLengthEncodingSuite extends FunSuite {
4343
// -------------
4444

4545
val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding)
46-
builder.initialize(0)
47-
4846
val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
4947
val inputSeq = inputRuns.flatMap { case (index, run) =>
5048
Seq.fill(run)(index)
@@ -56,13 +54,14 @@ class RunLengthEncodingSuite extends FunSuite {
5654
// Column type ID + null count + null positions
5755
val headerSize = CompressionScheme.columnHeaderSize(buffer)
5856

59-
// 4 extra bytes each run for run length
60-
val compressedSize = inputRuns.map { case (index, _) =>
57+
// Compression scheme ID + compressed contents
58+
val compressedSize = 4 + inputRuns.map { case (index, _) =>
59+
// 4 extra bytes each run for run length
6160
columnType.actualSize(values(index)) + 4
6261
}.sum
6362

6463
// 4 extra bytes for compression scheme type ID
65-
expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
64+
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
6665

6766
// Skips column header
6867
buffer.position(headerSize)
@@ -80,7 +79,7 @@ class RunLengthEncodingSuite extends FunSuite {
8079
// Rewinds, skips column header and 4 more bytes for compression scheme ID
8180
buffer.rewind().position(headerSize + 4)
8281

83-
val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
82+
val decoder = RunLengthEncoding.decoder(buffer, columnType)
8483

8584
inputSeq.foreach { i =>
8685
expectResult(values(i), "Wrong decoded value")(decoder.next())
@@ -89,6 +88,10 @@ class RunLengthEncodingSuite extends FunSuite {
8988
assert(!decoder.hasNext)
9089
}
9190

91+
test(s"$RunLengthEncoding with $typeName: empty column") {
92+
skeleton(0, Seq.empty)
93+
}
94+
9295
test(s"$RunLengthEncoding with $typeName: simple case") {
9396
skeleton(2, Seq(0 -> 2, 1 ->2))
9497
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ object TestCompressibleColumnBuilder {
3737
columnType: NativeColumnType[T],
3838
scheme: CompressionScheme) = {
3939

40-
new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
40+
val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
41+
builder.initialize(0)
42+
builder
4143
}
4244
}
4345

0 commit comments

Comments
 (0)