Skip to content

Commit 9d4c7a2

Browse files
committed
FileFormatWriter should respect the input query schema
1 parent fe7b219 commit 9d4c7a2

File tree

22 files changed

+75
-76
lines changed

22 files changed

+75
-76
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
178178
})
179179
}
180180

181-
override def innerChildren: Seq[QueryPlan[_]] = subqueries
181+
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
182182

183183
/**
184184
* Returns a plan where a best effort attempt has been made to transform `this` in a way

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
2424
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
2525
* eagerly executed.
2626
*/
27-
trait Command extends LogicalPlan {
27+
trait Command extends LeafNode {
2828
override def output: Seq[Attribute] = Seq.empty
29-
override def children: Seq[LogicalPlan] = Seq.empty
3029
}

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
119119
* `SparkSQLDriver` for CLI applications.
120120
*/
121121
def hiveResultString(): Seq[String] = executedPlan match {
122-
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
122+
case ExecutedCommandExec(desc: DescribeTableCommand) =>
123123
// If it is a describe command for a Hive table, we want to have the output format
124124
// be similar with Hive.
125125
desc.run(sparkSession).map {
@@ -130,7 +130,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
130130
.mkString("\t")
131131
}
132132
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
133-
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
133+
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
134134
command.executeCollect().map(_.getString(1))
135135
case other =>
136136
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
364364
// Can we automate these 'pass through' operations?
365365
object BasicOperators extends Strategy {
366366
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
367-
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil
367+
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
368368

369369
case MemoryPlan(sink, output) =>
370370
val encoder = RowEncoder(sink.schema)

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ case class InMemoryRelation(
6262
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
6363
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
6464
extends logical.LeafNode with MultiInstanceRelation {
65-
override def innerChildren: Seq[SparkPlan] = Seq(child)
65+
66+
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
6667

6768
override def producedAttributes: AttributeSet = outputSet
6869

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ case class InMemoryTableScanExec(
3434
@transient relation: InMemoryRelation)
3535
extends LeafExecNode {
3636

37-
override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
37+
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
3838

3939
override lazy val metrics = Map(
4040
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.SparkContext
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2324
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
2425
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2526
import org.apache.spark.util.SerializableConfiguration
@@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration
3031
*/
3132
trait DataWritingCommand extends RunnableCommand {
3233

34+
def query: LogicalPlan
35+
36+
// We make the input `query` an inner child instead of a child in order to hide it from the
37+
// optimizer. This is because optimizer may change the output schema names, and we have to keep
38+
// the original analyzed plan here so that we can pass the corrected schema to the writer. The
39+
// schema of analyzed plan is what user expects(or specifies), so we should respect it when
40+
// writing.
41+
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
42+
3343
override lazy val metrics: Map[String, SQLMetric] = {
3444
val sparkContext = SparkContext.getActive.get
3545
Map(

sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.apache.spark.SparkException
2121
import org.apache.spark.sql._
2222
import org.apache.spark.sql.catalyst.catalog._
2323
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24-
import org.apache.spark.sql.execution.SparkPlan
2524
import org.apache.spark.sql.execution.datasources._
2625

2726
/**
@@ -45,10 +44,9 @@ case class InsertIntoDataSourceDirCommand(
4544
query: LogicalPlan,
4645
overwrite: Boolean) extends RunnableCommand {
4746

48-
override def children: Seq[LogicalPlan] = Seq(query)
47+
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
4948

50-
override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
51-
assert(children.length == 1)
49+
override def run(sparkSession: SparkSession): Seq[Row] = {
5250
assert(storage.locationUri.nonEmpty, "Directory path is required")
5351
assert(provider.nonEmpty, "Data source is required")
5452

sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ case class CacheTableCommand(
3030
require(plan.isEmpty || tableIdent.database.isEmpty,
3131
"Database name is not allowed in CACHE TABLE AS SELECT")
3232

33-
override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
33+
override protected def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
3434

3535
override def run(sparkSession: SparkSession): Seq[Row] = {
3636
plan.foreach { logicalPlan =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import org.apache.spark.sql.{Row, SparkSession}
2424
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2525
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
27-
import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
28-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29-
import org.apache.spark.sql.execution.SparkPlan
27+
import org.apache.spark.sql.catalyst.plans.QueryPlan
28+
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
29+
import org.apache.spark.sql.execution.LeafExecNode
3030
import org.apache.spark.sql.execution.debug._
3131
import org.apache.spark.sql.execution.metric.SQLMetric
3232
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -37,29 +37,22 @@ import org.apache.spark.sql.types._
3737
* A logical command that is executed for its side-effects. `RunnableCommand`s are
3838
* wrapped in `ExecutedCommand` during execution.
3939
*/
40-
trait RunnableCommand extends logical.Command {
40+
trait RunnableCommand extends Command {
4141

4242
// The map used to record the metrics of running the command. This will be passed to
4343
// `ExecutedCommand` during query planning.
4444
lazy val metrics: Map[String, SQLMetric] = Map.empty
4545

46-
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
47-
throw new NotImplementedError
48-
}
49-
50-
def run(sparkSession: SparkSession): Seq[Row] = {
51-
throw new NotImplementedError
52-
}
46+
def run(sparkSession: SparkSession): Seq[Row]
5347
}
5448

5549
/**
5650
* A physical operator that executes the run method of a `RunnableCommand` and
5751
* saves the result to prevent multiple executions.
5852
*
5953
* @param cmd the `RunnableCommand` this operator will run.
60-
* @param children the children physical plans ran by the `RunnableCommand`.
6154
*/
62-
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {
55+
case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
6356

6457
override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
6558

@@ -74,19 +67,14 @@ case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) e
7467
*/
7568
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
7669
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
77-
val rows = if (children.isEmpty) {
78-
cmd.run(sqlContext.sparkSession)
79-
} else {
80-
cmd.run(sqlContext.sparkSession, children)
81-
}
82-
rows.map(converter(_).asInstanceOf[InternalRow])
70+
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
8371
}
8472

85-
override def innerChildren: Seq[QueryPlan[_]] = cmd.innerChildren
73+
override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
8674

8775
override def output: Seq[Attribute] = cmd.output
8876

89-
override def nodeName: String = cmd.nodeName
77+
override def nodeName: String = "Execute " + cmd.nodeName
9078

9179
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
9280

0 commit comments

Comments
 (0)