From 43bfd8189c140aa033d2ee60732aaabd72ca9379 Mon Sep 17 00:00:00 2001 From: s71955 Date: Fri, 8 Mar 2019 01:16:51 +0530 Subject: [PATCH] [SPARK-27036][spark-sql] Cancel the running jobs in the background once broadcast timeout occurs ## What changes were proposed in this pull request? Currently even Broadcast thread is timed out, Jobs are not aborted and it will run in the baakground, as per current design the broadcast future will be submitting the job whose result needs to be broadcasted wiithin a particular time, when the broadcast timeout happens the jobs which are scheduled will not getting killed and it will continue running in background even though time out happens. As part of solution we shall get the jobs based on execution id from appstatus store and cancel the respective job before throwing out the Future time out exception, this can help to terminate the job promptly when TimeOutException happens, this will also save the additional resources getting utilized even after timeout exception thrown from driver. In UI also the jobs are getting failed after applying this patch. ## How was this patch tested? Manually --- .../execution/exchange/BroadcastExchangeExec.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index b9972b8cf017..0adcaf548af4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -26,12 +26,14 @@ import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.HashedRelation import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.ui.SQLExecutionUIData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SparkFatalException, ThreadUtils} @@ -147,6 +149,15 @@ case class BroadcastExchangeExec( } catch { case ex: TimeoutException => logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex) + val executionUIData = sqlContext.sparkSession.sharedState.statusStore. + execution(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toInt). + getOrElse(ex).asInstanceOf[SQLExecutionUIData] + executionUIData.jobs.keySet.foreach { jobId => + sparkContext.dagScheduler. + cancelJob(jobId, Option("Job cacelled due to broadcast timeout.")) + sparkContext.listenerBus.post(SparkListenerJobEnd(jobId, + System.currentTimeMillis(), JobFailed(ex))) + } throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",