Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.{broadcast, SparkContext, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._

override val runId: UUID = UUID.randomUUID
// Cancelling a SQL statement from Spark ThriftServer needs to cancel
// its related broadcast sub-jobs. So set the run id to job group id if exists.
override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I think this is risky. It's possible that in a non-STS environment, users set job group id manually, and run some long-running jobs. If we capture the job group id here in broadcast exchange, when the broadcast timeout, it will cancel the whole job group which may kill the user's other long-running jobs unexpectedly.

I think we need to revisit the STS's SQL statement canceling feature. We should use SQL execution ID to find out all the jobs of a SQL query, and assign a unique job group id to them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan yes, the case you said is a problem in current implementation. I will give a new PR. Revert this first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job group id is still a basic API which used to cancel the a group of jobs (depends on custom business). In a non-STS environment, users can set job group id manually, and run some long-running jobs. In some cases, such as a custom exception, user want to cancel all jobs with the same job group. And broadcast timeout shouldn't use job group Id to can broadcast job.

Copy link
Contributor

@cloud-fan cloud-fan Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me revert this first. Please let me know when you have a new fix, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/ a conf(maybe named spark.jobGroubID.inherited) to decide whether the runId is re-generated or inherited from the former specified one. Users may develop applications like ThriftServer in C/S architecture as a server-like spark program.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get the point. If a user set spark.jobGroubID.inherited to true and set a custom jobGroupId to a UUID value, when the broadcast timeout, what's behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this?

  override val runId: UUID =
    if (SQLConf.get.getConf(spark.jobGroubID.inherited)) {
       UUID.randomUUID
    } else {
      UUID.fromString(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, something like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I know. To be transparent to users, how about add a new thread local property SparkContext.SPARK_RESERVED_JOB_GROUP_ID or SPARK_THRIFTSERVER_JOB_GROUP_ID to separate it.

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f6e8a5694d..cc3efed713 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -760,9 +760,13 @@ class SparkContext(config: SparkConf) extends Logging {
    * may respond to Thread.interrupt() by marking nodes as dead.
    */
   def setJobGroup(groupId: String,
-      description: String, interruptOnCancel: Boolean = false): Unit = {
+      description: String, interruptOnCancel: Boolean = false, reserved: Boolean = false): Unit = {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
-    setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    if (reserved) {
+      setLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID, groupId)
+    } else {
+      setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    }
     // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
     // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
     // APIs to also take advantage of this property (e.g., internal job failures or canceling from
@@ -2760,6 +2764,7 @@ object SparkContext extends Logging {

   private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
+  private[spark] val SPARK_RESERVED_JOB_GROUP_ID = "spark.reservedJobGroup.id"
   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
   private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
   private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c322d5eef5..25abb4f2d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -76,7 +76,8 @@ case class BroadcastExchangeExec(

   // Cancelling a SQL statement from Spark ThriftServer needs to cancel
   // its related broadcast sub-jobs. So set the run id to job group id if exists.
-  override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+  override val runId: UUID =
+    Option(sparkContext.getLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID))
       .map(UUID.fromString).getOrElse(UUID.randomUUID)

   override lazy val metrics = Map(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8ca0ab91a7..4db50e8d00 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -286,7 +286,7 @@ private[hive] class SparkExecuteStatementOperation(
         parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
       }

-      sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
+      sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel, true)
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())
       HiveThriftServer2.eventManager.onStatementParsed(statementId,

.map(UUID.fromString).getOrElse(UUID.randomUUID)

override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
Expand Down