Skip to content

Commit 1adaf6e

Browse files
author
wangzhenhua
committed
run command with new execution Id except insert table commands
1 parent 2269155 commit 1adaf6e

File tree

4 files changed

+17
-10
lines changed

4 files changed

+17
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
2828
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
2929
import org.apache.spark.sql.catalyst.rules.Rule
3030
import org.apache.spark.sql.catalyst.util.DateTimeUtils
31-
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
31+
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, InsertTableCommand, ShowTablesCommand}
3232
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
3333
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
3434
import org.apache.spark.util.Utils
@@ -114,7 +114,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
114114

115115

116116
/**
117-
* Returns the result as a hive compatible sequence of strings. This is for testing only.
117+
* Returns the result as a hive compatible sequence of strings.
118118
*/
119119
def hiveResultString(): Seq[String] = executedPlan match {
120120
case ExecutedCommandExec(desc: DescribeTableCommand) =>
@@ -130,12 +130,17 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
130130
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
131131
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
132132
command.executeCollect().map(_.getString(1))
133+
case insertCommand @ ExecutedCommandExec(_: InsertTableCommand) =>
134+
// Insert command will start a new execution through FileFormatWriter
135+
insertCommand.executeCollect().map(_.toString)
133136
case other =>
134-
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
135-
// We need the types so we can output struct field names
136-
val types = analyzed.output.map(_.dataType)
137-
// Reformat to match hive tab delimited output.
138-
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
137+
SQLExecution.withNewExecutionId(sparkSession, this) {
138+
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
139+
// We need the types so we can output struct field names
140+
val types = analyzed.output.map(_.dataType)
141+
// Reformat to match hive tab delimited output.
142+
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
143+
}
139144
}
140145

141146
/** Formats a datum (based on the given data type) and returns the string representation. */

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,3 +1025,5 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
10251025
builder.toString()
10261026
}
10271027
}
1028+
1029+
trait InsertTableCommand extends RunnableCommand

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ case class InsertIntoHadoopFsRelationCommand(
4949
mode: SaveMode,
5050
catalogTable: Option[CatalogTable],
5151
fileIndex: Option[FileIndex])
52-
extends RunnableCommand {
52+
extends InsertTableCommand {
5353

5454
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
5555

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
3636
import org.apache.spark.sql.catalyst.catalog.CatalogTable
3737
import org.apache.spark.sql.catalyst.expressions.Attribute
3838
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
39-
import org.apache.spark.sql.execution.command.RunnableCommand
39+
import org.apache.spark.sql.execution.command.InsertTableCommand
4040
import org.apache.spark.sql.execution.datasources.FileFormatWriter
4141
import org.apache.spark.sql.hive._
4242
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
@@ -78,7 +78,7 @@ case class InsertIntoHiveTable(
7878
partition: Map[String, Option[String]],
7979
query: LogicalPlan,
8080
overwrite: Boolean,
81-
ifNotExists: Boolean) extends RunnableCommand {
81+
ifNotExists: Boolean) extends InsertTableCommand {
8282

8383
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
8484

0 commit comments

Comments
 (0)