diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 0c5fee20385e..c322d5eef590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise} import scala.concurrent.duration.NANOSECONDS import scala.util.control.NonFatal -import org.apache.spark.{broadcast, SparkException} +import org.apache.spark.{broadcast, SparkContext, SparkException} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -74,7 +74,10 @@ case class BroadcastExchangeExec( child: SparkPlan) extends BroadcastExchangeLike { import BroadcastExchangeExec._ - override val runId: UUID = UUID.randomUUID + // Cancelling a SQL statement from Spark ThriftServer needs to cancel + // its related broadcast sub-jobs. So set the run id to job group id if exists. + override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)) + .map(UUID.fromString).getOrElse(UUID.randomUUID) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),