Skip to content

Commit 95b3301

Browse files
committed
Fixed bugs in IntegralDelta
1 parent 7b4203a commit 95b3301

File tree

2 files changed

+19
-16
lines changed

2 files changed

+19
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -396,26 +396,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
396396

397397
if (initial) {
398398
initial = false
399-
prev = value
400399
_compressedSize += 1 + columnType.defaultSize
401400
} else {
402401
val (smallEnough, _) = byteSizedDelta(value, prev)
403402
_compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
404403
}
404+
405+
prev = value
405406
}
406407

407408
override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
408409
to.putInt(typeId)
409410

410411
if (from.hasRemaining) {
411-
val prev = columnType.extract(from)
412-
412+
var prev = columnType.extract(from)
413413
to.put(Byte.MinValue)
414414
columnType.append(prev, to)
415415

416416
while (from.hasRemaining) {
417417
val current = columnType.extract(from)
418418
val (smallEnough, delta) = byteSizedDelta(current, prev)
419+
prev = current
419420

420421
if (smallEnough) {
421422
to.put(delta)
@@ -442,13 +443,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
442443

443444
override def next() = {
444445
val delta = buffer.get()
445-
446-
if (delta > Byte.MinValue) {
447-
addDelta(prev, delta)
448-
} else {
449-
prev = columnType.extract(buffer)
450-
prev
451-
}
446+
prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
447+
prev
452448
}
453449

454450
override def hasNext = buffer.hasRemaining
@@ -464,7 +460,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {
464460

465461
override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
466462
val delta = x - y
467-
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
463+
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
468464
}
469465
}
470466

@@ -477,6 +473,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {
477473

478474
override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
479475
val delta = x - y
480-
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
476+
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
481477
}
482478
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
2222
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
2323
import org.apache.spark.sql.catalyst.types.IntegralType
2424
import org.apache.spark.sql.columnar._
25+
import org.apache.spark.sql.columnar.ColumnarTestUtils._
2526

2627
class IntegralDeltaSuite extends FunSuite {
2728
testIntegralDelta(new IntColumnStats, INT, IntDelta)
@@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
6364
} else {
6465
val oneBoolean = columnType.defaultSize
6566
1 + oneBoolean + deltas.map {
66-
d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
67+
d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
6768
}.sum
6869
})
6970

@@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
7879
expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))
7980

8081
(input.tail, deltas).zipped.foreach { (value, delta) =>
81-
if (delta < Byte.MaxValue) {
82+
if (math.abs(delta) <= Byte.MaxValue) {
8283
expectResult(delta, "Wrong delta")(buffer.get())
8384
} else {
8485
expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
@@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {
105106

106107
test(s"$scheme: simple case") {
107108
val input = columnType match {
108-
case INT => Seq(1: Int, 2: Int, 130: Int)
109-
case LONG => Seq(1: Long, 2: Long, 130: Long)
109+
case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
110+
case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
110111
}
111112

112113
skeleton(input.map(_.asInstanceOf[I#JvmType]))
113114
}
115+
116+
test(s"$scheme: long random series") {
117+
// Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here.
118+
val input = Array.fill[Any](10000)(makeRandomValue(columnType))
119+
skeleton(input.map(_.asInstanceOf[I#JvmType]))
120+
}
114121
}
115122
}

0 commit comments

Comments
 (0)