From 07324450f3aecfdb335a85cfb8631eb45e4eb02b Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 11 May 2014 14:10:10 -0400 Subject: [PATCH 1/4] support setting maxCapacity to something different than capacity in kryo Output --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 926e71573be32..a2a3cc5f49f95 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,11 +39,12 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 + private val bufferSizeMb = conf.getInt("spark.kryoserializer.buffer.mb", 2) + private val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", bufferSizeMb) private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrator = conf.getOption("spark.kryo.registrator") - def newKryoOutput() = new KryoOutput(bufferSize) + def newKryoOutput() = new KryoOutput(bufferSizeMb * 1024 * 1024, maxBufferSizeMb * 1024 * 1024) def newKryo(): Kryo = { val instantiator = new EmptyScalaKryoInstantiator From 143ec4d9a994f0b4e09d87311142c290504d9abb Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 11 May 2014 18:07:26 -0400 Subject: [PATCH 2/4] test resizable buffer in kryo Output --- .../serializer/KryoSerializerSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 5d4673aebe9e8..85f9bf79618e6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -193,6 +193,35 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } } +class KryoSerializerResizableOutputSuite extends FunSuite { + import org.apache.spark.SparkConf + import org.apache.spark.SparkContext + import org.apache.spark.LocalSparkContext + import org.apache.spark.SparkException + + // trial and error showed this will not serialize with 1mb buffer + val x = (1 to 400000).toArray + + test("kryo without resizable output buffer should fail on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + val sc = new SparkContext("local", "test", conf) + intercept[SparkException](sc.parallelize(x).collect) + LocalSparkContext.stop(sc) + } + + test("kryo with resizable output buffer should succeed on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "2") + val sc = new SparkContext("local", "test", conf) + assert(sc.parallelize(x).collect === x) + LocalSparkContext.stop(sc) + } +} + object KryoTest { case class CaseClass(i: Int, s: String) {} From 0c9f8eb453748fa9c5f40649d4fda5821f71ac53 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 14 May 2014 00:10:22 -0400 Subject: [PATCH 3/4] make default for kryo max buffer size 16MB --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 6 +++--- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index a2a3cc5f49f95..5352f03701f75 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,12 +39,12 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSizeMb = conf.getInt("spark.kryoserializer.buffer.mb", 2) - private val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", bufferSizeMb) + private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 + private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 16) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrator = conf.getOption("spark.kryo.registrator") - def newKryoOutput() = new KryoOutput(bufferSizeMb * 1024 * 1024, maxBufferSizeMb * 1024 * 1024) + def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) def newKryo(): Kryo = { val instantiator = new EmptyScalaKryoInstantiator diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 85f9bf79618e6..f28bc7b47787a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -206,6 +206,7 @@ class KryoSerializerResizableOutputSuite extends FunSuite { val conf = new SparkConf(false) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "1") val sc = new SparkContext("local", "test", conf) intercept[SparkException](sc.parallelize(x).collect) LocalSparkContext.stop(sc) From 15f6d81c411d3cea641169a898c431f56a21df91 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sat, 21 Jun 2014 13:11:49 -0400 Subject: [PATCH 4/4] change default for spark.kryoserializer.buffer.max.mb to 64mb and add some documentation --- .../apache/spark/serializer/KryoSerializer.scala | 2 +- docs/configuration.md | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cf0c495c6c2c0..e62f3415b4a5e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -46,7 +46,7 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 - private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 16) * 1024 * 1024 + private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrator = conf.getOption("spark.kryo.registrator") diff --git a/docs/configuration.md b/docs/configuration.md index b84104cc7e653..c7949c0a115b9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -385,10 +385,17 @@ Apart from these, the following properties are also available, and may be useful spark.kryoserializer.buffer.mb 2 - Maximum object size to allow within Kryo (the library needs to create a buffer at least as - large as the largest single object you'll serialize). Increase this if you get a "buffer limit - exceeded" exception inside Kryo. Note that there will be one buffer per core on each - worker. + Object size to allow within Kryo using default (pre-allocated) buffers (the library needs to create + a buffer at least as large as the largest single object you'll serialize). Note that there will be + one buffer per core on each worker. + + + + spark.kryoserializer.buffer.max.mb + 64 + + Maximum object size to allow within Kryo by resizing buffers as needed (which has some overhead). + Increase this if you get a "buffer limit exceeded" exception inside Kryo.