File tree Expand file tree Collapse file tree 4 files changed +12
-7
lines changed
main/scala/org/apache/spark/sql
test/scala/org/apache/spark/sql/test Expand file tree Collapse file tree 4 files changed +12
-7
lines changed Original file line number Diff line number Diff line change @@ -1285,6 +1285,10 @@ object SQLContext {
12851285 Option (instantiatedContext.get())
12861286 }
12871287
1288+ private [sql] def clearSqlListener (listener : SQLListener ): Unit = {
1289+ sqlListener.compareAndSet(listener, null )
1290+ }
1291+
12881292 /**
12891293 * Changes the SQLContext that will be returned in this thread and its children when
12901294 * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1335,9 +1339,10 @@ object SQLContext {
13351339 private [sql] def createListenerAndUI (sc : SparkContext ): SQLListener = {
13361340 if (sqlListener.get() == null ) {
13371341 val listener = new SQLListener (sc.conf)
1338- sqlListener.compareAndSet(null , listener)
1339- sc.addSparkListener(listener)
1340- sc.ui.foreach(new SQLTab (listener, _))
1342+ if (sqlListener.compareAndSet(null , listener)) {
1343+ sc.addSparkListener(listener)
1344+ sc.ui.foreach(new SQLTab (listener, _))
1345+ }
13411346 }
13421347 sqlListener.get()
13431348 }
Original file line number Diff line number Diff line change @@ -79,7 +79,7 @@ private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetr
7979 * A specialized long Accumulable to avoid boxing and unboxing when using Accumulator's
8080 * `+=` and `add`.
8181 */
82- private [sql] class LongSQLMetric private [metric] (name : String , param : LongSQLMetricParam )
82+ private [sql] class LongSQLMetric private [metric](name : String , param : LongSQLMetricParam )
8383 extends SQLMetric [LongSQLMetricValue , Long ](name, param) {
8484
8585 override def += (term : Long ): Unit = {
Original file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import scala.collection.mutable
2121
2222import org .apache .spark .annotation .DeveloperApi
2323import org .apache .spark .scheduler ._
24+ import org .apache .spark .sql .execution .SQLExecution
2425import org .apache .spark .sql .execution .SparkPlanInfo
2526import org .apache .spark .sql .execution .metric .{LongSQLMetricValue , SQLMetricValue , SQLMetricParam }
2627import org .apache .spark .{JobExecutionStatus , Logging , SparkConf }
@@ -59,8 +60,6 @@ private[sql] class SQLEventRegister extends SparkListenerEventRegister {
5960
6061private [sql] class SQLListener (conf : SparkConf ) extends SparkListener with Logging {
6162
62- val EXECUTION_ID_KEY = " spark.sql.execution.id"
63-
6463 private val retainedExecutions = conf.getInt(" spark.sql.ui.retainedExecutions" , 1000 )
6564
6665 private val activeExecutions = mutable.HashMap [Long , SQLExecutionUIData ]()
@@ -112,7 +111,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
112111 }
113112
114113 override def onJobStart (jobStart : SparkListenerJobStart ): Unit = {
115- val executionIdString = jobStart.properties.getProperty(EXECUTION_ID_KEY )
114+ val executionIdString = jobStart.properties.getProperty(SQLExecution . EXECUTION_ID_KEY )
116115 if (executionIdString == null ) {
117116 // This is not a job created by SQL
118117 return
Original file line number Diff line number Diff line change @@ -56,6 +56,7 @@ trait SharedSQLContext extends SQLTestUtils {
5656 try {
5757 if (_ctx != null ) {
5858 _ctx.sparkContext.stop()
59+ SQLContext .clearSqlListener(_ctx.listener)
5960 _ctx = null
6061 }
6162 } finally {
You can’t perform that action at this time.
0 commit comments