Skip to content

Commit 3742359

Browse files
author
kai
committed
Fix not-serializable exception for code-generated keys in broadcasted relations
1 parent 14e4bf8 commit 3742359

File tree

2 files changed

+3
-9
lines changed

2 files changed

+3
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
2525
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
2626
import org.apache.spark.util.ThreadUtils
2727

28+
import scala.collection.JavaConversions._
2829
import scala.concurrent._
2930
import scala.concurrent.duration._
3031

@@ -77,8 +78,8 @@ case class BroadcastHashOuterJoin(
7778
// Note that we use .execute().collect() because we don't want to convert data to Scala types
7879
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
7980
// buildHashTable uses code-generated rows as keys, which are not serializable
80-
val hashed = new GeneralHashedRelation(
81-
buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output)))
81+
val hashed =
82+
buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))
8283
sparkContext.broadcast(hashed)
8384
}(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
8485

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,6 @@ import org.apache.spark.util.collection.CompactBuffer
3232
private[joins] sealed trait HashedRelation {
3333
def get(key: InternalRow): CompactBuffer[InternalRow]
3434

35-
def getOrElse(
36-
key: InternalRow,
37-
default: CompactBuffer[InternalRow]): CompactBuffer[InternalRow] = {
38-
val v = get(key)
39-
if (v eq null) default else v
40-
}
41-
4235
// This is a helper method to implement Externalizable, and is used by
4336
// GeneralHashedRelation and UniqueKeyHashedRelation
4437
protected def writeBytes(out: ObjectOutput, serialized: Array[Byte]): Unit = {

0 commit comments

Comments
 (0)