Skip to content

Commit 0f98eb2

Browse files
liutang123rdblue
authored andcommitted
[SPARK-24809][SQL] Serializing LongToUnsafeRowMap in executor may result in data error
When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk. ## What changes were proposed in this pull request? Restore cursor value when deserializing. Author: liulijia <[email protected]> Closes apache#21772 from liutang123/SPARK-24809.
1 parent c227732 commit 0f98eb2

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
741741
array = readLongArray(readBuffer, length)
742742
val pageLength = readLong().toInt
743743
page = readLongArray(readBuffer, pageLength)
744+
// Restore cursor variable to make this map able to be serialized again on executors.
745+
cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
744746
}
745747

746748
override def readExternal(in: ObjectInput): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,35 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
277277
map.free()
278278
}
279279

280+
test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") {
281+
val unsafeProj = UnsafeProjection.create(Array[DataType](LongType))
282+
val originalMap = new LongToUnsafeRowMap(mm, 1)
283+
284+
val key1 = 1L
285+
val value1 = 4852306286022334418L
286+
287+
val key2 = 2L
288+
val value2 = 8813607448788216010L
289+
290+
originalMap.append(key1, unsafeProj(InternalRow(value1)))
291+
originalMap.append(key2, unsafeProj(InternalRow(value2)))
292+
originalMap.optimize()
293+
294+
val ser = sparkContext.env.serializer.newInstance()
295+
// Simulate serialize/deserialize twice on driver and executor
296+
val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
297+
val secondTimeSerialized =
298+
ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized))
299+
300+
val resultRow = new UnsafeRow(1)
301+
assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1)
302+
assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2)
303+
304+
originalMap.free()
305+
firstTimeSerialized.free()
306+
secondTimeSerialized.free()
307+
}
308+
280309
test("Spark-14521") {
281310
val ser = new KryoSerializer(
282311
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()

0 commit comments

Comments
 (0)