From 32adf0388d65d656e0a513f8cf25b699c96f1519 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 11 Jan 2021 13:50:48 +0800 Subject: [PATCH 1/4] [SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled --- .../sql/execution/exchange/BroadcastExchangeExec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 0c5fee20385e..90cfa0182b2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise} import scala.concurrent.duration.NANOSECONDS import scala.util.control.NonFatal -import org.apache.spark.{broadcast, SparkException} +import org.apache.spark.{broadcast, SparkContext, SparkException} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -74,7 +74,9 @@ case class BroadcastExchangeExec( child: SparkPlan) extends BroadcastExchangeLike { import BroadcastExchangeExec._ - override val runId: UUID = UUID.randomUUID + private val groupId = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) + + override val runId: UUID = if (groupId == null) UUID.randomUUID else UUID.fromString(groupId) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), @@ -113,7 +115,7 @@ case class BroadcastExchangeExec( // Setup a job group here so later it may get cancelled by groupId if necessary. sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)", interruptOnCancel = true) - val beforeCollect = System.nanoTime() + val beforeCollect = Sgystem.nanoTime() // Use executeCollect/executeCollectIterator to avoid conversion to Scala types val (numRows, input) = child.executeCollectIterator() longMetric("numOutputRows") += numRows From 804f4da5ad63bb83720bc0d0265d3d3a0c48ab0f Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 11 Jan 2021 14:35:53 +0800 Subject: [PATCH 2/4] fix typo --- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 90cfa0182b2a..e8e999ad367e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -115,7 +115,7 @@ case class BroadcastExchangeExec( // Setup a job group here so later it may get cancelled by groupId if necessary. sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)", interruptOnCancel = true) - val beforeCollect = Sgystem.nanoTime() + val beforeCollect = System.nanoTime() // Use executeCollect/executeCollectIterator to avoid conversion to Scala types val (numRows, input) = child.executeCollectIterator() longMetric("numOutputRows") += numRows From 03d79d6d805773c8cc1093a57134b41c02bb3bb8 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 11 Jan 2021 16:19:11 +0800 Subject: [PATCH 3/4] use Option --- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index e8e999ad367e..535ee25709ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -76,7 +76,7 @@ case class BroadcastExchangeExec( private val groupId = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) - override val runId: UUID = if (groupId == null) UUID.randomUUID else UUID.fromString(groupId) + override val runId: UUID = Option(groupId).map(UUID.fromString).getOrElse(UUID.randomUUID) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), From b3f1453925dd8ba0bbb43b9f3d0b701e7c6f9507 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 13 Jan 2021 16:05:04 +0800 Subject: [PATCH 4/4] inline groupId --- .../sql/execution/exchange/BroadcastExchangeExec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 535ee25709ac..c322d5eef590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -74,9 +74,10 @@ case class BroadcastExchangeExec( child: SparkPlan) extends BroadcastExchangeLike { import BroadcastExchangeExec._ - private val groupId = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) - - override val runId: UUID = Option(groupId).map(UUID.fromString).getOrElse(UUID.randomUUID) + // Cancelling a SQL statement from Spark ThriftServer needs to cancel + // its related broadcast sub-jobs. So set the run id to job group id if exists. + override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)) + .map(UUID.fromString).getOrElse(UUID.randomUUID) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),