Skip to content

Commit f63b773

Browse files
committed
SPARK-20213: Fix more tests with nested SQL executions.
1 parent 90045cf commit f63b773

File tree

10 files changed

+50
-31
lines changed

10 files changed

+50
-31
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,10 @@ private[kafka010] object KafkaWriter extends Logging {
8686
topic: Option[String] = None): Unit = {
8787
val schema = queryExecution.analyzed.output
8888
validateQuery(queryExecution, kafkaParameters, topic)
89-
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
90-
queryExecution.toRdd.foreachPartition { iter =>
91-
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
92-
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
93-
finallyBlock = writeTask.close())
94-
}
89+
queryExecution.toRdd.foreachPartition { iter =>
90+
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
91+
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
92+
finallyBlock = writeTask.close())
9593
}
9694
}
9795
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ object SQLExecution extends Logging {
122122
// To avoid this warning, use nested { ... }
123123
if (!Option(sc.getLocalProperty(ALLOW_NESTED_EXECUTION)).exists(_.toBoolean)) {
124124
if (testing) {
125-
throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set")
125+
throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set: $oldExecutionId")
126126
} else {
127127
logWarning(s"$EXECUTION_ID_KEY is already set")
128128
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate._
2525
import org.apache.spark.sql.catalyst.plans.logical._
26+
import org.apache.spark.sql.execution.SQLExecution
2627

2728

2829
/**
@@ -96,7 +97,9 @@ case class AnalyzeColumnCommand(
9697
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
9798

9899
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
99-
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
100+
val statsRow = SQLExecution.nested(sparkSession) {
101+
Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
102+
}
100103

101104
val rowCount = statsRow.getLong(0)
102105
val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2626
import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
28+
import org.apache.spark.sql.execution.SQLExecution
2829
import org.apache.spark.sql.internal.SessionState
2930

3031

@@ -56,7 +57,9 @@ case class AnalyzeTableCommand(
5657
// 2. when total size is changed, `oldRowCount` becomes invalid.
5758
// This is to make sure that we only record the right statistics.
5859
if (!noscan) {
59-
val newRowCount = sparkSession.table(tableIdentWithDB).count()
60+
val newRowCount = SQLExecution.nested(sparkSession) {
61+
sparkSession.table(tableIdentWithDB).count()
62+
}
6063
if (newRowCount >= 0 && newRowCount != oldRowCount) {
6164
newStats = if (newStats.isDefined) {
6265
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.plans.QueryPlan
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.execution.SQLExecution
2324
import org.apache.spark.sql.execution.command.RunnableCommand
2425
import org.apache.spark.sql.sources.InsertableRelation
2526

@@ -37,14 +38,18 @@ case class InsertIntoDataSourceCommand(
3738

3839
override def run(sparkSession: SparkSession): Seq[Row] = {
3940
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
40-
val data = Dataset.ofRows(sparkSession, query)
41-
// Apply the schema of the existing table to the new data.
42-
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
43-
relation.insert(df, overwrite)
44-
45-
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to this
46-
// data source relation.
47-
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, logicalRelation)
41+
SQLExecution.nested(sparkSession) {
42+
val data = Dataset.ofRows(sparkSession, query)
43+
44+
// Apply the schema of the existing table to the new data.
45+
val df = sparkSession.internalCreateDataFrame(
46+
data.queryExecution.toRdd, logicalRelation.schema)
47+
relation.insert(df, overwrite)
48+
49+
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
50+
// this data source relation.
51+
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, logicalRelation)
52+
}
4853

4954
Seq.empty[Row]
5055
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
2020
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
2121
import org.apache.spark.sql.catalyst.plans.QueryPlan
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.execution.SQLExecution
2324
import org.apache.spark.sql.execution.command.RunnableCommand
2425

2526
/**
@@ -41,12 +42,13 @@ case class SaveIntoDataSourceCommand(
4142
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
4243

4344
override def run(sparkSession: SparkSession): Seq[Row] = {
44-
DataSource(
45-
sparkSession,
46-
className = provider,
47-
partitionColumns = partitionColumns,
48-
options = options).write(mode, Dataset.ofRows(sparkSession, query))
49-
45+
SQLExecution.nested(sparkSession) {
46+
DataSource(
47+
sparkSession,
48+
className = provider,
49+
partitionColumns = partitionColumns,
50+
options = options).write(mode, Dataset.ofRows(sparkSession, query))
51+
}
5052
Seq.empty[Row]
5153
}
5254
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2424
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
2525
import org.apache.spark.sql.catalyst.expressions.Attribute
2626
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27+
import org.apache.spark.sql.execution.SQLExecution
2728
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
2829
import org.apache.spark.sql.types._
2930

@@ -89,8 +90,9 @@ case class CreateTempViewUsing(
8990
options = options)
9091

9192
val catalog = sparkSession.sessionState.catalog
92-
val viewDefinition = Dataset.ofRows(
93-
sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
93+
val viewDefinition = SQLExecution.nested(sparkSession) {
94+
Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
95+
}
9496

9597
if (global) {
9698
catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{DataFrame, SQLContext}
22+
import org.apache.spark.sql.execution.SQLExecution
2223
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
2324
import org.apache.spark.sql.streaming.OutputMode
2425

@@ -45,9 +46,11 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
4546
println(batchIdStr)
4647
println("-------------------------------------------")
4748
// scalastyle:off println
48-
data.sparkSession.createDataFrame(
49-
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
50-
.show(numRowsToShow, isTruncated)
49+
SQLExecution.nested(data.sparkSession) {
50+
data.sparkSession.createDataFrame(
51+
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
52+
.show(numRowsToShow, isTruncated)
53+
}
5154
}
5255
}
5356

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.{SparkSession, SQLContext}
3636
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3737
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
38-
import org.apache.spark.sql.execution.QueryExecution
38+
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
3939
import org.apache.spark.sql.execution.command.CacheTableCommand
4040
import org.apache.spark.sql.hive._
4141
import org.apache.spark.sql.hive.client.HiveClient
@@ -552,7 +552,10 @@ private[hive] class TestHiveQueryExecution(
552552
logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table }
553553
val referencedTestTables = referencedTables.filter(sparkSession.testTables.contains)
554554
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
555-
referencedTestTables.foreach(sparkSession.loadTestTable)
555+
// this lazy value may be computed inside another SQLExecution.withNewExecutionId block
556+
SQLExecution.nested(sparkSession) {
557+
referencedTestTables.foreach(sparkSession.loadTestTable)
558+
}
556559
// Proceed with analysis.
557560
sparkSession.sessionState.analyzer.execute(logical)
558561
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
965965
}
966966

967967
test("sanity test for SPARK-6618") {
968-
(1 to 100).par.map { i =>
968+
(1 to 100).map { i =>
969969
val tableName = s"SPARK_6618_table_$i"
970970
sql(s"CREATE TABLE $tableName (col1 string)")
971971
sessionState.catalog.lookupRelation(TableIdentifier(tableName))

0 commit comments

Comments
 (0)