Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import scala.util.control.NonFatal
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SparkFatalException, ThreadUtils}

Expand Down Expand Up @@ -147,6 +149,15 @@ case class BroadcastExchangeExec(
} catch {
case ex: TimeoutException =>
logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex)
val executionUIData = sqlContext.sparkSession.sharedState.statusStore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a reliable way to get the associated jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLAppStatusListener will hold the live execution data, so i use the same for getting the associated jobs , If its not efficient way of getting then i will revisit the code and try to find a better mechanism for getting the associated job for the particular execution Id. please let me know for any suggestions. thanks for your valuable time.

Copy link
Contributor Author

@sujith71955 sujith71955 Mar 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find an alternative to get Jobs based on the execution id in this layer of execution, seems to be this way shall be reliable as whenever we are submitting /processing events via dagscheduler, we are always posting the events to SQLAppStatusListener, this will make our job viable to retrieve the jobs from LiveExecutionData.
Please let me know if we have any better way to get this job done. Thanks .

execution(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toInt).
getOrElse(ex).asInstanceOf[SQLExecutionUIData]
executionUIData.jobs.keySet.foreach { jobId =>
sparkContext.dagScheduler.
cancelJob(jobId, Option("Job cacelled due to broadcast timeout."))
sparkContext.listenerBus.post(SparkListenerJobEnd(jobId,
System.currentTimeMillis(), JobFailed(ex)))
}
throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " +
s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " +
s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
Expand Down