From 735088693fd7d02dacaf2173a7c314099a9ee391 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 26 May 2015 10:25:01 -0700 Subject: [PATCH 1/6] Add failing regression test for SPARK-7873 --- .../apache/spark/serializer/KryoSerializerSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 8c384bd358ebc..d496d2c498c00 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -361,6 +361,17 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } } +class KryoSerializerAutoResetDisabledSuite extends FunSuite with SharedSparkContext { + conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) + conf.set("spark.shuffle.manager", "sort") + conf.set("spark.shuffle.sort.bypassMergeThreshold", "200") + + test("sort-shuffle with bypassMergeSort (SPARK-7873)") { + val myObject = ("Hello", "World") + assert(sc.parallelize(Seq.fill(100)(myObject)).repartition(2).collect().toSet === Set(myObject)) + } +} class ClassLoaderTestingObject From 9816e8f829e1cb566183033acdf2222826e5bf89 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 26 May 2015 11:13:51 -0700 Subject: [PATCH 2/6] Add a failing test showing how deserialize() and deserializeStream() can interfere. --- .../serializer/KryoSerializerSuite.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index d496d2c498c00..ef50bc9438f95 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.collection.mutable import scala.reflect.ClassTag @@ -364,6 +364,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { class KryoSerializerAutoResetDisabledSuite extends FunSuite with SharedSparkContext { conf.set("spark.serializer", classOf[KryoSerializer].getName) conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) + conf.set("spark.kryo.referenceTracking", "true") conf.set("spark.shuffle.manager", "sort") conf.set("spark.shuffle.sort.bypassMergeThreshold", "200") @@ -371,6 +372,29 @@ class KryoSerializerAutoResetDisabledSuite extends FunSuite with SharedSparkCont val myObject = ("Hello", "World") assert(sc.parallelize(Seq.fill(100)(myObject)).repartition(2).collect().toSet === Set(myObject)) } + + test("calling deserialize() after deserializeStream()") { + val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] + assert(!serInstance.getAutoReset()) + val hello = "Hello" + val world = "World" + // Here, we serialize the same value twice, so the reference-tracking should cause us to store + // references to some of these values + val helloHello = serInstance.serialize((hello, hello)) + // Here's a stream which only contains one value + val worldWorld: Array[Byte] = { + val baos = new ByteArrayOutputStream() + val serStream = serInstance.serializeStream(baos) + serStream.writeObject(world) + serStream.writeObject(world) + serStream.close() + baos.toByteArray + } + val deserializationStream = serInstance.deserializeStream(new ByteArrayInputStream(worldWorld)) + assert(deserializationStream.readValue[Any]() === world) + deserializationStream.close() + assert(serInstance.deserialize[Any](helloHello) === (hello, hello)) + } } class ClassLoaderTestingObject From ab457ca6cf2e6ca4cd9def9d6655c345aa425f56 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 26 May 2015 12:45:08 -0700 Subject: [PATCH 3/6] Sketch a loan/release based solution. This makes it safe to invoke all SerializerInstance methods at any time, including the creation of multiple open OutputStreams from the same KryoSerializerInstance. --- .../spark/serializer/KryoSerializer.scala | 95 +++++++++++++++---- 1 file changed, 75 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 217957963437d..de271a2e2e972 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -136,8 +136,12 @@ class KryoSerializer(conf: SparkConf) } private[spark] -class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { - val output = new KryoOutput(outStream) +class KryoSerializationStream( + serInstance: KryoSerializerInstance, + outStream: OutputStream) extends SerializationStream { + + private[this] var output: KryoOutput = new KryoOutput(outStream) + private[this] var kryo: Kryo = serInstance.borrowKryo() override def writeObject[T: ClassTag](t: T): SerializationStream = { kryo.writeClassAndObject(output, t) @@ -145,12 +149,24 @@ class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends Seria } override def flush() { output.flush() } - override def close() { output.close() } + override def close() { + try { + output.close() + } finally { + serInstance.releaseKryo(kryo) + kryo = null + output = null + } + } } private[spark] -class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { - private val input = new KryoInput(inStream) +class KryoDeserializationStream( + serInstance: KryoSerializerInstance, + inStream: InputStream) extends DeserializationStream { + + private[this] var input: KryoInput = new KryoInput(inStream) + private[this] var kryo: Kryo = serInstance.borrowKryo() override def readObject[T: ClassTag](): T = { try { @@ -163,13 +179,37 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } override def close() { - // Kryo's Input automatically closes the input stream it is using. - input.close() + try { + // Kryo's Input automatically closes the input stream it is using. + input.close() + } finally { + serInstance.releaseKryo(kryo) + kryo = null + input = null + } } } private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - private val kryo = ks.newKryo() + + private[this] var cachedKryo: Kryo = ks.newKryo() + + private[spark] def borrowKryo(): Kryo = { + if (cachedKryo != null) { + val kryo = cachedKryo + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) + cachedKryo = null + kryo + } else { + ks.newKryo() + } + } + + private[spark] def releaseKryo(kryo: Kryo): Unit = { + if (cachedKryo == null) { + cachedKryo = kryo + } + } // Make these lazy vals to avoid creating a buffer unless we use them private lazy val output = ks.newKryoOutput() @@ -177,38 +217,48 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() - kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) + val kryo = borrowKryo() try { kryo.writeClassAndObject(output, t) } catch { case e: KryoException if e.getMessage.startsWith("Buffer overflow") => throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " + "increase spark.kryoserializer.buffer.max value.") + } finally { + releaseKryo(kryo) } ByteBuffer.wrap(output.toBytes) } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { - input.setBuffer(bytes.array) - kryo.readClassAndObject(input).asInstanceOf[T] + val kryo = borrowKryo() + try { + input.setBuffer(bytes.array) + kryo.readClassAndObject(input).asInstanceOf[T] + } finally { + releaseKryo(kryo) + } } override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { + val kryo = borrowKryo() val oldClassLoader = kryo.getClassLoader - kryo.setClassLoader(loader) - input.setBuffer(bytes.array) - val obj = kryo.readClassAndObject(input).asInstanceOf[T] - kryo.setClassLoader(oldClassLoader) - obj + try { + kryo.setClassLoader(loader) + input.setBuffer(bytes.array) + kryo.readClassAndObject(input).asInstanceOf[T] + } finally { + kryo.setClassLoader(oldClassLoader) + releaseKryo(kryo) + } } override def serializeStream(s: OutputStream): SerializationStream = { - kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) - new KryoSerializationStream(kryo, s) + new KryoSerializationStream(this, s) } override def deserializeStream(s: InputStream): DeserializationStream = { - new KryoDeserializationStream(kryo, s) + new KryoDeserializationStream(this, s) } /** @@ -218,7 +268,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ def getAutoReset(): Boolean = { val field = classOf[Kryo].getDeclaredField("autoReset") field.setAccessible(true) - field.get(kryo).asInstanceOf[Boolean] + val kryo = borrowKryo() + try { + field.get(kryo).asInstanceOf[Boolean] + } finally { + releaseKryo(kryo) + } } } From 3f1da968aa87810e12ff4465717d84f1cba336d9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 26 May 2015 14:40:47 -0700 Subject: [PATCH 4/6] Guard against duplicate close() --- .../spark/serializer/KryoSerializer.scala | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index de271a2e2e972..55dd753adaac0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.{EOFException, InputStream, OutputStream} +import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import scala.reflect.ClassTag @@ -148,14 +148,22 @@ class KryoSerializationStream( this } - override def flush() { output.flush() } + override def flush() { + if (output == null) { + throw new IOException("Stream is closed") + } + output.flush() + } + override def close() { - try { - output.close() - } finally { - serInstance.releaseKryo(kryo) - kryo = null - output = null + if (output != null) { + try { + output.close() + } finally { + serInstance.releaseKryo(kryo) + kryo = null + output = null + } } } } @@ -179,13 +187,15 @@ class KryoDeserializationStream( } override def close() { - try { - // Kryo's Input automatically closes the input stream it is using. - input.close() - } finally { - serInstance.releaseKryo(kryo) - kryo = null - input = null + if (input != null) { + try { + // Kryo's Input automatically closes the input stream it is using. + input.close() + } finally { + serInstance.releaseKryo(kryo) + kryo = null + input = null + } } } } From ba55d20ba9c3768d04fb7b39c45a3123b66b8d6f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 27 May 2015 11:51:51 -0700 Subject: [PATCH 5/6] Add explanatory comments --- .../spark/serializer/KryoSerializer.scala | 28 +++++++++++++++---- .../apache/spark/serializer/Serializer.scala | 5 ++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 55dd753adaac0..33909e5c696b3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.serializer import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer +import javax.annotation.Nullable import scala.reflect.ClassTag @@ -202,12 +203,24 @@ class KryoDeserializationStream( private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - private[this] var cachedKryo: Kryo = ks.newKryo() + /** + * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do + * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching + * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are + * not synchronized. + */ + @Nullable private[this] var cachedKryo: Kryo = null - private[spark] def borrowKryo(): Kryo = { + /** + * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; + * otherwise, it allocates a new instance. + */ + private[serializer] def borrowKryo(): Kryo = { if (cachedKryo != null) { val kryo = cachedKryo - kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) + // As a defensive measure, call reset() to clear any Kryo state that might have been modified + // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) + kryo.reset() cachedKryo = null kryo } else { @@ -215,13 +228,18 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } } - private[spark] def releaseKryo(kryo: Kryo): Unit = { + /** + * Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo + * instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later + * re-use. + */ + private[serializer] def releaseKryo(kryo: Kryo): Unit = { if (cachedKryo == null) { cachedKryo = kryo } } - // Make these lazy vals to avoid creating a buffer unless we use them + // Make these lazy vals to avoid creating a buffer unless we use them. private lazy val output = ks.newKryoOutput() private lazy val input = new KryoInput() diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 6078c9d433ebf..f1bdff96d3df1 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.serializer import java.io._ import java.nio.ByteBuffer +import javax.annotation.concurrent.NotThreadSafe import scala.reflect.ClassTag @@ -114,8 +115,12 @@ object Serializer { /** * :: DeveloperApi :: * An instance of a serializer, for use by one thread at a time. + * + * It is legal to create multiple serialization / deserialization streams from the same + * SerializerInstance as long as those streams are all used within the same thread. */ @DeveloperApi +@NotThreadSafe abstract class SerializerInstance { def serialize[T: ClassTag](t: T): ByteBuffer From 00b402edb790a0d2eac8f6afda2a748563afccfc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 27 May 2015 15:05:42 -0700 Subject: [PATCH 6/6] Initialize eagerly to fix a failing test --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 33909e5c696b3..3f909885dbd66 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -209,7 +209,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are * not synchronized. */ - @Nullable private[this] var cachedKryo: Kryo = null + @Nullable private[this] var cachedKryo: Kryo = borrowKryo() /** * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;