Skip to content

Commit c2f0821

Browse files
liyezhang556520srowen
authored andcommitted
[SPARK-7392] [CORE] bugfix: Kryo buffer size cannot be larger than 2M
Author: Zhang, Liye <[email protected]> Closes apache#5934 from liyezhang556520/kryoBufSize and squashes the following commits: 5707e04 [Zhang, Liye] fix import order 8693288 [Zhang, Liye] replace multiplier with ByteUnit methods 9bf93e9 [Zhang, Liye] add tests d91e5ed [Zhang, Liye] change kb to mb
1 parent f496bf3 commit c2f0821

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark._
3232
import org.apache.spark.api.python.PythonBroadcast
3333
import org.apache.spark.broadcast.HttpBroadcast
3434
import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
35+
import org.apache.spark.network.util.ByteUnit
3536
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
3637
import org.apache.spark.storage._
3738
import org.apache.spark.util.BoundedPriorityQueue
@@ -51,18 +52,18 @@ class KryoSerializer(conf: SparkConf)
5152

5253
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
5354

54-
if (bufferSizeKb >= 2048) {
55+
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
5556
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
56-
s"2048 mb, got: + $bufferSizeKb mb.")
57+
s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.")
5758
}
58-
private val bufferSize = (bufferSizeKb * 1024).toInt
59+
private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
5960

6061
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
61-
if (maxBufferSizeMb >= 2048) {
62+
if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
6263
throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
6364
s"2048 mb, got: + $maxBufferSizeMb mb.")
6465
}
65-
private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
66+
private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
6667

6768
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
6869
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
3232
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
3333
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
3434

35+
test("configuration limits") {
36+
val conf1 = conf.clone()
37+
val kryoBufferProperty = "spark.kryoserializer.buffer"
38+
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
39+
conf1.set(kryoBufferProperty, "64k")
40+
conf1.set(kryoBufferMaxProperty, "64m")
41+
new KryoSerializer(conf1).newInstance()
42+
// 2048m = 2097152k
43+
conf1.set(kryoBufferProperty, "2097151k")
44+
conf1.set(kryoBufferMaxProperty, "64m")
45+
// should not throw exception when kryoBufferMaxProperty < kryoBufferProperty
46+
new KryoSerializer(conf1).newInstance()
47+
conf1.set(kryoBufferMaxProperty, "2097151k")
48+
new KryoSerializer(conf1).newInstance()
49+
val conf2 = conf.clone()
50+
conf2.set(kryoBufferProperty, "2048m")
51+
val thrown1 = intercept[IllegalArgumentException](new KryoSerializer(conf2).newInstance())
52+
assert(thrown1.getMessage.contains(kryoBufferProperty))
53+
val conf3 = conf.clone()
54+
conf3.set(kryoBufferMaxProperty, "2048m")
55+
val thrown2 = intercept[IllegalArgumentException](new KryoSerializer(conf3).newInstance())
56+
assert(thrown2.getMessage.contains(kryoBufferMaxProperty))
57+
val conf4 = conf.clone()
58+
conf4.set(kryoBufferProperty, "2g")
59+
conf4.set(kryoBufferMaxProperty, "3g")
60+
val thrown3 = intercept[IllegalArgumentException](new KryoSerializer(conf4).newInstance())
61+
assert(thrown3.getMessage.contains(kryoBufferProperty))
62+
assert(!thrown3.getMessage.contains(kryoBufferMaxProperty))
63+
}
64+
3565
test("basic types") {
3666
val ser = new KryoSerializer(conf).newInstance()
3767
def check[T: ClassTag](t: T) {

0 commit comments

Comments
 (0)