Skip to content

Commit c9ebfd0

Browse files
committed
optimize code style.
1 parent 06a9547 commit c9ebfd0

File tree

2 files changed

+13
-16
lines changed

2 files changed

+13
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
772772
array = readLongArray(readBuffer, length)
773773
val pageLength = readLong().toInt
774774
page = readLongArray(readBuffer, pageLength)
775-
// Set cursor because cursor is used in write function.
775+
// Restore cursor variable to make this map able to be serialized again on executors.
776776
cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
777777
}
778778

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -278,37 +278,34 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
278278
map.free()
279279
}
280280

281-
test("SPARK-24809: Serializing LongHashedRelation in executor may result in data error") {
281+
test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") {
282282
val unsafeProj = UnsafeProjection.create(Array[DataType](LongType))
283283
val originalMap = new LongToUnsafeRowMap(mm, 1)
284284

285285
val key1 = 1L
286-
val value1 = new Random().nextLong()
286+
val value1 = 4852306286022334418L
287287

288288
val key2 = 2L
289-
val value2 = new Random().nextLong()
289+
val value2 = 8813607448788216010L
290290

291291
originalMap.append(key1, unsafeProj(InternalRow(value1)))
292292
originalMap.append(key2, unsafeProj(InternalRow(value2)))
293293
originalMap.optimize()
294294

295-
val resultRow = new UnsafeRow(1)
296-
assert(originalMap.getValue(key1, resultRow).getLong(0) === value1)
297-
assert(originalMap.getValue(key2, resultRow).getLong(0) === value2)
298-
299295
val ser = new KryoSerializer(
300296
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
297+
// Simulate serialize/deserialize twice on driver and executor
298+
val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
299+
val secondTimeSerialized =
300+
ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized))
301301

302-
val mapSerializedInDriver = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
303-
val mapSerializedInExecutor =
304-
ser.deserialize[LongToUnsafeRowMap](ser.serialize(mapSerializedInDriver))
305-
306-
assert(mapSerializedInExecutor.getValue(key1, resultRow).getLong(0) === value1)
307-
assert(mapSerializedInExecutor.getValue(key2, resultRow).getLong(0) === value2)
302+
val resultRow = new UnsafeRow(1)
303+
assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1)
304+
assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2)
308305

309306
originalMap.free()
310-
mapSerializedInDriver.free()
311-
mapSerializedInExecutor.free()
307+
firstTimeSerialized.free()
308+
secondTimeSerialized.free()
312309
}
313310

314311
test("Spark-14521") {

0 commit comments

Comments
 (0)