Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2765,6 +2765,11 @@ object SparkContext extends Logging {
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"

/**
* Statement id is only used for thrift server
*/
private[spark] val SPARK_STATEMENT_ID = "spark.statement.id"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can define it in sql/core module, as it's only used there and the STS module. Probably in SQLExecution object.


/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
* changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.{broadcast, SparkContext, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -74,7 +74,9 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._

override val runId: UUID = UUID.randomUUID
// runId must be a UUID. We set it to statementId if defined.
override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_STATEMENT_ID))
.map(UUID.fromString).getOrElse(UUID.randomUUID)

override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveString, TimeFormatters}
Expand Down Expand Up @@ -131,6 +132,7 @@ private[hive] class SparkExecuteStatementOperation(
getNextRowSetInternal(order, maxRowsL)
} finally {
sqlContext.sparkContext.clearJobGroup()
clearStatementId()
}
}

Expand Down Expand Up @@ -285,7 +287,7 @@ private[hive] class SparkExecuteStatementOperation(
if (!runInBackground) {
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}

setStatementId()
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
Expand Down Expand Up @@ -332,6 +334,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
sqlContext.sparkContext.clearJobGroup()
clearStatementId()
}
}

Expand Down Expand Up @@ -369,6 +372,14 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

private def setStatementId(): Unit = {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_STATEMENT_ID, statementId)
}

private def clearStatementId(): Unit = {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_STATEMENT_ID, null)
}
}

object SparkExecuteStatementOperation {
Expand Down