Skip to content

Commit f9d20b9

Browse files
scwfdrcrallen
authored andcommitted
[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception
Jira: https://issues.apache.org/jira/browse/SPARK-12222 Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception: ``` com.esotericsoftware.kryo.KryoException: Buffer underflow. at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) ``` This is caused by a bug of kryo's `Input.skip(long count)`(EsotericSoftware/kryo#119) and we call this method in `KryoInputDataInputBridge`. Instead of upgrade kryo's version, this pr bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method. more detail link to apache#9748 (comment) Author: Fei Wang <[email protected]> Closes apache#10213 from scwf/patch-1.
1 parent 7f0ebdb commit f9d20b9

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,15 @@ private[serializer] class KryoInputDataInputBridge(input : KryoInput) extends Da
368368
override def readUTF(): String = input.readString() // readString in kryo does utf8
369369
override def readInt(): Int = input.readInt()
370370
override def readUnsignedShort(): Int = input.readShortUnsigned()
371-
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
371+
override def skipBytes(n: Int): Int = {
372+
var remaining: Long = n
373+
while (remaining > 0) {
374+
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
375+
input.skip(skip)
376+
remaining -= skip
377+
}
378+
n
379+
}
372380
override def readFully(b: Array[Byte]): Unit = input.read(b)
373381
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
374382
override def readLine(): String = throw new UnsupportedOperationException("readLine")

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@
1717

1818
package org.apache.spark.serializer
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}
2121

2222
import scala.collection.mutable
2323
import scala.reflect.ClassTag
2424

2525
import com.esotericsoftware.kryo.Kryo
26+
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
27+
28+
import org.roaringbitmap.RoaringBitmap
2629

2730
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
2831
import org.apache.spark.scheduler.HighlyCompressedMapStatus
2932
import org.apache.spark.serializer.KryoTest._
33+
import org.apache.spark.util.Utils
3034
import org.apache.spark.storage.BlockManagerId
3135

3236
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
@@ -319,6 +323,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
319323
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
320324
}
321325

326+
test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
327+
val dir = Utils.createTempDir()
328+
val tmpfile = dir.toString + "/RoaringBitmap"
329+
val outStream = new FileOutputStream(tmpfile)
330+
val output = new KryoOutput(outStream)
331+
val bitmap = new RoaringBitmap
332+
bitmap.add(1)
333+
bitmap.add(3)
334+
bitmap.add(5)
335+
bitmap.serialize(new KryoOutputDataOutputBridge(output))
336+
output.flush()
337+
output.close()
338+
339+
val inStream = new FileInputStream(tmpfile)
340+
val input = new KryoInput(inStream)
341+
val ret = new RoaringBitmap
342+
ret.deserialize(new KryoInputDataInputBridge(input))
343+
input.close()
344+
assert(ret == bitmap)
345+
Utils.deleteRecursively(dir)
346+
}
347+
322348
test("getAutoReset") {
323349
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
324350
assert(ser.getAutoReset)

0 commit comments

Comments
 (0)