Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import scala.concurrent._
import scala.concurrent.duration._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -62,10 +63,29 @@ case class BroadcastHashJoin(
override def requiredChildDistribution: Seq[Distribution] =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

private[this] val broadcastFutureInitLock = new Object()

// Use @volatile so we can read a snapshot of this without locking.
@volatile
private[this] var broadcastFutureValue: Option[Future[Broadcast[HashedRelation]]] = None

// Use lazy so that we won't do broadcast when calling explain but still cache the broadcast value
// for the same query.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be updated. Have you checked that the behavior described in the comment is maintained?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be updated. Have you checked that the behavior described in the comment is maintained?

@transient
private lazy val broadcastFuture = {
private def broadcastFuture = {
broadcastFutureInitLock.synchronized {
if (broadcastFutureValue.isEmpty) {
broadcastFutureValue = Some(createBroadcastFuture)
}
broadcastFutureValue.get
}
}

/**
* Exposes the broadcast future so we can do external accounting of memory usage.
*/
def broadcastFutureOpt: Option[Future[Broadcast[HashedRelation]]] = broadcastFutureValue

private def createBroadcastFuture: Future[Broadcast[HashedRelation]] = {
val numBuildRows = buildSide match {
case BuildLeft => longMetric("numLeftRows")
case BuildRight => longMetric("numRightRows")
Expand Down