Skip to content

Commit e19726d

Browse files
committed
Add fix for SPARK-7766.
1 parent 71845e3 commit e19726d

File tree

2 files changed

+4
-0
lines changed

2 files changed

+4
-0
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
177177

178178
override def serialize[T: ClassTag](t: T): ByteBuffer = {
179179
output.clear()
180+
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
180181
try {
181182
kryo.writeClassAndObject(output, t)
182183
} catch {
@@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
202203
}
203204

204205
override def serializeStream(s: OutputStream): SerializationStream = {
206+
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
205207
new KryoSerializationStream(kryo, s)
206208
}
207209

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
345345
assert (output1 === output2)
346346
}
347347

348+
// Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
349+
// reference-tracking would lead to corrupted output when serializer instances are re-used
348350
for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
349351
test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
350352
testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)

0 commit comments

Comments
 (0)