From a72fe61863e119c0e902cef3054d9140b6d04f77 Mon Sep 17 00:00:00 2001 From: liulijia Date: Sun, 15 Jul 2018 19:24:55 +0800 Subject: [PATCH 1/5] [SPARK-24809] [SQL] Serializing LongHashedRelation in executor may result in data error --- .../spark/sql/execution/joins/HashedRelation.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 20ce01f4ce8cc..9943596d2c437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -726,8 +726,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap writeLong(array.length) writeLongArray(writeBuffer, array, array.length) - val used = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt - writeLong(used) + val cursorFlag = cursor - Platform.LONG_ARRAY_OFFSET + writeLong(cursorFlag) + val used = (cursorFlag / 8).toInt writeLongArray(writeBuffer, page, used) } @@ -770,7 +771,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap val length = readLong().toInt mask = length - 2 array = readLongArray(readBuffer, length) - val pageLength = readLong().toInt + val cursorFlag = readLong() + cursor = cursorFlag + Platform.LONG_ARRAY_OFFSET + val pageLength = (cursorFlag / 8).toInt page = readLongArray(readBuffer, pageLength) } From f67ff4d91764e67f811ebfb7c02ab466153a5b02 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 19 Jul 2018 15:28:10 +0800 Subject: [PATCH 2/5] [SPARK-24809][SQL] updte code style, add UT. --- .../sql/execution/joins/HashedRelation.scala | 16 ++++----- .../execution/joins/HashedRelationSuite.scala | 33 +++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 9943596d2c437..b5bc637dcae4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -726,10 +726,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap writeLong(array.length) writeLongArray(writeBuffer, array, array.length) - val cursorFlag = cursor - Platform.LONG_ARRAY_OFFSET - writeLong(cursorFlag) - val used = (cursorFlag / 8).toInt - writeLongArray(writeBuffer, page, used) + + val usedWordsNumber = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt + writeLong(usedWordsNumber) + writeLongArray(writeBuffer, page, usedWordsNumber) } override def writeExternal(output: ObjectOutput): Unit = { @@ -771,10 +771,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap val length = readLong().toInt mask = length - 2 array = readLongArray(readBuffer, length) - val cursorFlag = readLong() - cursor = cursorFlag + Platform.LONG_ARRAY_OFFSET - val pageLength = (cursorFlag / 8).toInt - page = readLongArray(readBuffer, pageLength) + val usedWordsNumber = readLong().toInt + // Set cursor because cursor is used in write function. + cursor = usedWordsNumber * 8 + Platform.LONG_ARRAY_OFFSET + page = readLongArray(readBuffer, usedWordsNumber) } override def readExternal(in: ObjectInput): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 037cc2e3ccad7..eaf4877e983d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -278,6 +278,39 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { map.free() } + test("SPARK-24809: Serializing LongHashedRelation in executor may result in data error") { + val unsafeProj = UnsafeProjection.create(Array[DataType](LongType)) + val originalMap = new LongToUnsafeRowMap(mm, 1) + + val key1 = 1L + val value1 = new Random().nextLong() + + val key2 = 2L + val value2 = new Random().nextLong() + + originalMap.append(key1, unsafeProj(InternalRow(value1))) + originalMap.append(key2, unsafeProj(InternalRow(value2))) + originalMap.optimize() + + val resultRow = new UnsafeRow(1) + assert(originalMap.getValue(key1, resultRow).getLong(0) === value1) + assert(originalMap.getValue(key2, resultRow).getLong(0) === value2) + + val ser = new KryoSerializer( + (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() + + val mapSerializedInDriver = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) + val mapSerializedInExecutor = + ser.deserialize[LongToUnsafeRowMap](ser.serialize(mapSerializedInDriver)) + + assert(mapSerializedInExecutor.getValue(key1, resultRow).getLong(0) === value1) + assert(mapSerializedInExecutor.getValue(key2, resultRow).getLong(0) === value2) + + originalMap.free() + mapSerializedInDriver.free() + mapSerializedInExecutor.free() + } + test("Spark-14521") { val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() From 06a95472f1e889cdd21e7051fc906526f333f6d3 Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 24 Jul 2018 11:19:27 +0800 Subject: [PATCH 3/5] do not change existing code --- .../spark/sql/execution/joins/HashedRelation.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index b5bc637dcae4e..08e068b18ffe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -726,10 +726,9 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap writeLong(array.length) writeLongArray(writeBuffer, array, array.length) - - val usedWordsNumber = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt - writeLong(usedWordsNumber) - writeLongArray(writeBuffer, page, usedWordsNumber) + val used = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt + writeLong(used) + writeLongArray(writeBuffer, page, used) } override def writeExternal(output: ObjectOutput): Unit = { @@ -771,10 +770,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap val length = readLong().toInt mask = length - 2 array = readLongArray(readBuffer, length) - val usedWordsNumber = readLong().toInt + val pageLength = readLong().toInt + page = readLongArray(readBuffer, pageLength) // Set cursor because cursor is used in write function. - cursor = usedWordsNumber * 8 + Platform.LONG_ARRAY_OFFSET - page = readLongArray(readBuffer, usedWordsNumber) + cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET } override def readExternal(in: ObjectInput): Unit = { From c9ebfd0acdeefa1495b48df84b137ea213b2f7fc Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 24 Jul 2018 13:04:22 +0800 Subject: [PATCH 4/5] optimize code style. --- .../sql/execution/joins/HashedRelation.scala | 2 +- .../execution/joins/HashedRelationSuite.scala | 27 +++++++++---------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 08e068b18ffe2..86eb47a70f1ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -772,7 +772,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap array = readLongArray(readBuffer, length) val pageLength = readLong().toInt page = readLongArray(readBuffer, pageLength) - // Set cursor because cursor is used in write function. + // Restore cursor variable to make this map able to be serialized again on executors. cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index eaf4877e983d9..ba736c57623b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -278,37 +278,34 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { map.free() } - test("SPARK-24809: Serializing LongHashedRelation in executor may result in data error") { + test("SPARK-24809: Serializing LongToUnsafeRowMap in executor may result in data error") { val unsafeProj = UnsafeProjection.create(Array[DataType](LongType)) val originalMap = new LongToUnsafeRowMap(mm, 1) val key1 = 1L - val value1 = new Random().nextLong() + val value1 = 4852306286022334418L val key2 = 2L - val value2 = new Random().nextLong() + val value2 = 8813607448788216010L originalMap.append(key1, unsafeProj(InternalRow(value1))) originalMap.append(key2, unsafeProj(InternalRow(value2))) originalMap.optimize() - val resultRow = new UnsafeRow(1) - assert(originalMap.getValue(key1, resultRow).getLong(0) === value1) - assert(originalMap.getValue(key2, resultRow).getLong(0) === value2) - val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() + // Simulate serialize/deserialize twice on driver and executor + val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) + val secondTimeSerialized = + ser.deserialize[LongToUnsafeRowMap](ser.serialize(firstTimeSerialized)) - val mapSerializedInDriver = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) - val mapSerializedInExecutor = - ser.deserialize[LongToUnsafeRowMap](ser.serialize(mapSerializedInDriver)) - - assert(mapSerializedInExecutor.getValue(key1, resultRow).getLong(0) === value1) - assert(mapSerializedInExecutor.getValue(key2, resultRow).getLong(0) === value2) + val resultRow = new UnsafeRow(1) + assert(secondTimeSerialized.getValue(key1, resultRow).getLong(0) === value1) + assert(secondTimeSerialized.getValue(key2, resultRow).getLong(0) === value2) originalMap.free() - mapSerializedInDriver.free() - mapSerializedInExecutor.free() + firstTimeSerialized.free() + secondTimeSerialized.free() } test("Spark-14521") { From 6246dfaff1bd04820f9b71cae60f663b45515d7a Mon Sep 17 00:00:00 2001 From: liulijia Date: Sun, 29 Jul 2018 22:03:24 +0800 Subject: [PATCH 5/5] small update. --- .../apache/spark/sql/execution/joins/HashedRelationSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index ba736c57623b1..d9b34dcd16476 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -292,8 +292,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { originalMap.append(key2, unsafeProj(InternalRow(value2))) originalMap.optimize() - val ser = new KryoSerializer( - (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() + val ser = sparkContext.env.serializer.newInstance() // Simulate serialize/deserialize twice on driver and executor val firstTimeSerialized = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap)) val secondTimeSerialized =