Skip to content

Commit 302c017

Browse files
mcdull-zhangcloud-fan
authored andcommitted
[SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
### What changes were proposed in this pull request? UnsafeHashedRelation should serialize numKeys out ### Why are the changes needed? One case I found was this: We turned on ReusedExchange(BroadcastExchange), but the returned UnsafeHashedRelation is missing numKeys. The reason is that the current type of TorrentBroadcast._value is SoftReference, so the UnsafeHashedRelation obtained by deserialization loses numKeys, which will lead to incorrect calculation results. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a line of assert to an existing unit test Closes #35836 from mcdull-zhang/UnsafeHashed. Authored-by: mcdull-zhang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 8476c8b) Signed-off-by: Wenchen Fan <[email protected]>
1 parent bc69e6c commit 302c017

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private[execution] class ValueRowWithKeyIndex {
197197
* A HashedRelation for UnsafeRow, which is backed BytesToBytesMap.
198198
*
199199
* It's serialized in the following format:
200-
* [number of keys]
200+
* [number of keys] [number of fields]
201201
* [size of key] [size of value] [key bytes] [bytes for value]
202202
*/
203203
private[joins] class UnsafeHashedRelation(
@@ -352,6 +352,7 @@ private[joins] class UnsafeHashedRelation(
352352
writeInt: (Int) => Unit,
353353
writeLong: (Long) => Unit,
354354
writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = {
355+
writeInt(numKeys)
355356
writeInt(numFields)
356357
// TODO: move these into BytesToBytesMap
357358
writeLong(binaryMap.numKeys())
@@ -385,6 +386,7 @@ private[joins] class UnsafeHashedRelation(
385386
readInt: () => Int,
386387
readLong: () => Long,
387388
readBuffer: (Array[Byte], Int, Int) => Unit): Unit = {
389+
numKeys = readInt()
388390
numFields = readInt()
389391
resultRow = new UnsafeRow(numFields)
390392
val nKeys = readLong()

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession {
9393
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
9494
assert(hashed2.get(unsafeData(2)).toArray === data2)
9595

96+
// SPARK-38542: UnsafeHashedRelation should serialize numKeys out
97+
assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1))
98+
9699
val os2 = new ByteArrayOutputStream()
97100
val out2 = new ObjectOutputStream(os2)
98101
hashed2.asInstanceOf[UnsafeHashedRelation].writeExternal(out2)

0 commit comments

Comments
 (0)