diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 20ce01f4ce8cc..86eb47a70f1ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -772,6 +772,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap array = readLongArray(readBuffer, length) val pageLength = readLong().toInt page = readLongArray(readBuffer, pageLength) + // Restore cursor variable to make this map able to be serialized again on executors. + cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET } override def readExternal(in: ObjectInput): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 037cc2e3ccad7..d9b34dcd16476 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -278,6 +278,35 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { map.free() } + test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") { + val unsafeProj = UnsafeProjection.create(Array[DataType](LongType)) + val originalMap = new LongToUnsafeRowMap(mm, 1) + + val key1 = 1L + val value1 = 4852306286022334418L + + val key2 = 2L + val value2 = 8813607448788216010L + + originalMap.append(key1, unsafeProj(InternalRow(value1))) + originalMap.append(key2, unsafeProj(InternalRow(value2))) + originalMap.optimize() + + val ser = sparkContext.env.serializer.newInstance() + // Simulate serialize/deserialize twice on driver and executor + val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) + val secondTimeSerialized = + ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized)) + + val resultRow = new UnsafeRow(1) + assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1) + assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2) + + originalMap.free() + firstTimeSerialized.free() + secondTimeSerialized.free() + } + test("Spark-14521") { val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()