diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index b9972b8cf017..0adcaf548af4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -26,12 +26,14 @@ import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.HashedRelation import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.ui.SQLExecutionUIData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SparkFatalException, ThreadUtils} @@ -147,6 +149,15 @@ case class BroadcastExchangeExec( } catch { case ex: TimeoutException => logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex) + val executionUIData = sqlContext.sparkSession.sharedState.statusStore. + execution(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toInt). + getOrElse(ex).asInstanceOf[SQLExecutionUIData] + executionUIData.jobs.keySet.foreach { jobId => + sparkContext.dagScheduler. + cancelJob(jobId, Option("Job cacelled due to broadcast timeout.")) + sparkContext.listenerBus.post(SparkListenerJobEnd(jobId, + System.currentTimeMillis(), JobFailed(ex))) + } throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",