Skip to content

Commit d4f0b1d

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-22834][SQL] Make insertion commands have real children to fix UI issues
## What changes were proposed in this pull request? With #19474, children of insertion commands are missing in UI. To fix it: 1. Create a new physical plan `DataWritingCommandExec` to exec `DataWritingCommand` with children. So that the other commands won't be affected. 2. On creation of `DataWritingCommand`, a new field `allColumns` must be specified, which is the output of analyzed plan. 3. In `FileFormatWriter`, the output schema will use `allColumns` instead of the output of optimized plan. Before code changes: ![2017-12-19 10 27 10](https://user-images.githubusercontent.com/1097932/34161850-d2fd0acc-e50c-11e7-898a-177154fe7d8e.png) After code changes: ![2017-12-19 10 27 04](https://user-images.githubusercontent.com/1097932/34161865-de23de26-e50c-11e7-9131-0c32f7b7b749.png) ## How was this patch tested? Unit test Author: Wang Gengliang <[email protected]> Closes #20020 from gengliangwang/insert.
1 parent 67ea11e commit d4f0b1d

File tree

14 files changed

+123
-64
lines changed

14 files changed

+123
-64
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability
2626
import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
2828
import org.apache.spark.sql.catalyst.catalog._
29-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
29+
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan}
3030
import org.apache.spark.sql.execution.SQLExecution
3131
import org.apache.spark.sql.execution.command.DDLUtils
3232
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
@@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
264264
sparkSession = df.sparkSession,
265265
className = source,
266266
partitionColumns = partitioningColumns.getOrElse(Nil),
267-
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
267+
options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan))
268268
}
269269
}
270270
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
403403
// Can we automate these 'pass through' operations?
404404
object BasicOperators extends Strategy {
405405
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
406+
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
406407
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
407408

408409
case MemoryPlan(sink, output) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
2020
import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.SparkContext
23-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.{Row, SparkSession}
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
25+
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
26+
import org.apache.spark.sql.execution.SparkPlan
2427
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
28+
import org.apache.spark.sql.execution.datasources.FileFormatWriter
2529
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2630
import org.apache.spark.util.SerializableConfiguration
2731

28-
2932
/**
30-
* A special `RunnableCommand` which writes data out and updates metrics.
33+
* A special `Command` which writes data out and updates metrics.
3134
*/
32-
trait DataWritingCommand extends RunnableCommand {
33-
35+
trait DataWritingCommand extends Command {
3436
/**
3537
* The input query plan that produces the data to be written.
38+
* IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
39+
* to [[FileFormatWriter]].
3640
*/
3741
def query: LogicalPlan
3842

39-
// We make the input `query` an inner child instead of a child in order to hide it from the
40-
// optimizer. This is because optimizer may not preserve the output schema names' case, and we
41-
// have to keep the original analyzed plan here so that we can pass the corrected schema to the
42-
// writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
43-
// it when writing.
44-
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
43+
override final def children: Seq[LogicalPlan] = query :: Nil
4544

46-
override lazy val metrics: Map[String, SQLMetric] = {
45+
// Output columns of the analyzed input query plan
46+
def outputColumns: Seq[Attribute]
47+
48+
lazy val metrics: Map[String, SQLMetric] = {
4749
val sparkContext = SparkContext.getActive.get
4850
Map(
4951
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
@@ -57,4 +59,6 @@ trait DataWritingCommand extends RunnableCommand {
5759
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
5860
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
5961
}
62+
63+
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
6064
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2727
import org.apache.spark.sql.catalyst.plans.QueryPlan
2828
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
29-
import org.apache.spark.sql.execution.LeafExecNode
29+
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
3030
import org.apache.spark.sql.execution.debug._
3131
import org.apache.spark.sql.execution.metric.SQLMetric
3232
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -87,6 +87,42 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
8787
}
8888
}
8989

90+
/**
91+
* A physical operator that executes the run method of a `DataWritingCommand` and
92+
* saves the result to prevent multiple executions.
93+
*
94+
* @param cmd the `DataWritingCommand` this operator will run.
95+
* @param child the physical plan child ran by the `DataWritingCommand`.
96+
*/
97+
case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
98+
extends SparkPlan {
99+
100+
override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
101+
102+
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
103+
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
104+
val rows = cmd.run(sqlContext.sparkSession, child)
105+
106+
rows.map(converter(_).asInstanceOf[InternalRow])
107+
}
108+
109+
override def children: Seq[SparkPlan] = child :: Nil
110+
111+
override def output: Seq[Attribute] = cmd.output
112+
113+
override def nodeName: String = "Execute " + cmd.nodeName
114+
115+
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
116+
117+
override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
118+
119+
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
120+
121+
protected override def doExecute(): RDD[InternalRow] = {
122+
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
123+
}
124+
}
125+
90126
/**
91127
* An explain command for users to see how a command will be executed.
92128
*

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -456,17 +456,6 @@ case class DataSource(
456456
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
457457
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
458458

459-
460-
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
461-
// not need to have the query as child, to avoid to analyze an optimized query,
462-
// because InsertIntoHadoopFsRelationCommand will be optimized first.
463-
val partitionAttributes = partitionColumns.map { name =>
464-
data.output.find(a => equality(a.name, name)).getOrElse {
465-
throw new AnalysisException(
466-
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
467-
}
468-
}
469-
470459
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
471460
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
472461
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
@@ -479,14 +468,15 @@ case class DataSource(
479468
outputPath = outputPath,
480469
staticPartitions = Map.empty,
481470
ifPartitionNotExists = false,
482-
partitionColumns = partitionAttributes,
471+
partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
483472
bucketSpec = bucketSpec,
484473
fileFormat = format,
485474
options = options,
486475
query = data,
487476
mode = mode,
488477
catalogTable = catalogTable,
489-
fileIndex = fileIndex)
478+
fileIndex = fileIndex,
479+
outputColumns = data.output)
490480
}
491481

492482
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
208208
actualQuery,
209209
mode,
210210
table,
211-
Some(t.location))
211+
Some(t.location),
212+
actualQuery.output)
212213
}
213214
}
214215

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3939
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _}
4040
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
4141
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
42-
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
42+
import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
4343
import org.apache.spark.sql.types.StringType
4444
import org.apache.spark.util.{SerializableConfiguration, Utils}
4545

@@ -56,7 +56,9 @@ object FileFormatWriter extends Logging {
5656

5757
/** Describes how output files should be placed in the filesystem. */
5858
case class OutputSpec(
59-
outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String])
59+
outputPath: String,
60+
customPartitionLocations: Map[TablePartitionSpec, String],
61+
outputColumns: Seq[Attribute])
6062

6163
/** A shared job description for all the write tasks. */
6264
private class WriteJobDescription(
@@ -101,7 +103,7 @@ object FileFormatWriter extends Logging {
101103
*/
102104
def write(
103105
sparkSession: SparkSession,
104-
queryExecution: QueryExecution,
106+
plan: SparkPlan,
105107
fileFormat: FileFormat,
106108
committer: FileCommitProtocol,
107109
outputSpec: OutputSpec,
@@ -117,11 +119,8 @@ object FileFormatWriter extends Logging {
117119
job.setOutputValueClass(classOf[InternalRow])
118120
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
119121

120-
// Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
121-
// names' case.
122-
val allColumns = queryExecution.analyzed.output
123122
val partitionSet = AttributeSet(partitionColumns)
124-
val dataColumns = allColumns.filterNot(partitionSet.contains)
123+
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
125124

126125
val bucketIdExpression = bucketSpec.map { spec =>
127126
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
@@ -144,7 +143,7 @@ object FileFormatWriter extends Logging {
144143
uuid = UUID.randomUUID().toString,
145144
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
146145
outputWriterFactory = outputWriterFactory,
147-
allColumns = allColumns,
146+
allColumns = outputSpec.outputColumns,
148147
dataColumns = dataColumns,
149148
partitionColumns = partitionColumns,
150149
bucketIdExpression = bucketIdExpression,
@@ -160,7 +159,7 @@ object FileFormatWriter extends Logging {
160159
// We should first sort by partition columns, then bucket id, and finally sorting columns.
161160
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
162161
// the sort order doesn't matter
163-
val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
162+
val actualOrdering = plan.outputOrdering.map(_.child)
164163
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
165164
false
166165
} else {
@@ -178,17 +177,18 @@ object FileFormatWriter extends Logging {
178177

179178
try {
180179
val rdd = if (orderingMatched) {
181-
queryExecution.toRdd
180+
plan.execute()
182181
} else {
183182
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
184183
// the physical plan may have different attribute ids due to optimizer removing some
185184
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
186185
val orderingExpr = requiredOrdering
187-
.map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
186+
.map(SortOrder(_, Ascending))
187+
.map(BindReferences.bindReference(_, outputSpec.outputColumns))
188188
SortExec(
189189
orderingExpr,
190190
global = false,
191-
child = queryExecution.executedPlan).execute()
191+
child = plan).execute()
192192
}
193193
val ret = new Array[WriteTaskResult](rdd.partitions.length)
194194
sparkSession.sparkContext.runJob(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT
2727
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2828
import org.apache.spark.sql.catalyst.expressions.Attribute
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
30+
import org.apache.spark.sql.execution.SparkPlan
3031
import org.apache.spark.sql.execution.command._
3132
import org.apache.spark.sql.util.SchemaUtils
3233

@@ -52,11 +53,12 @@ case class InsertIntoHadoopFsRelationCommand(
5253
query: LogicalPlan,
5354
mode: SaveMode,
5455
catalogTable: Option[CatalogTable],
55-
fileIndex: Option[FileIndex])
56+
fileIndex: Option[FileIndex],
57+
outputColumns: Seq[Attribute])
5658
extends DataWritingCommand {
5759
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
5860

59-
override def run(sparkSession: SparkSession): Seq[Row] = {
61+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
6062
// Most formats don't do well with duplicate columns, so lets not allow that
6163
SchemaUtils.checkSchemaColumnNameDuplication(
6264
query.schema,
@@ -139,11 +141,11 @@ case class InsertIntoHadoopFsRelationCommand(
139141
val updatedPartitionPaths =
140142
FileFormatWriter.write(
141143
sparkSession = sparkSession,
142-
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
144+
plan = child,
143145
fileFormat = fileFormat,
144146
committer = committer,
145147
outputSpec = FileFormatWriter.OutputSpec(
146-
qualifiedOutputPath.toString, customPartitionLocations),
148+
qualifiedOutputPath.toString, customPartitionLocations, outputColumns),
147149
hadoopConf = hadoopConf,
148150
partitionColumns = partitionColumns,
149151
bucketSpec = bucketSpec,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,14 @@ class FileStreamSink(
118118
throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
119119
}
120120
}
121+
val qe = data.queryExecution
121122

122123
FileFormatWriter.write(
123124
sparkSession = sparkSession,
124-
queryExecution = data.queryExecution,
125+
plan = qe.executedPlan,
125126
fileFormat = fileFormat,
126127
committer = committer,
127-
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
128+
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output),
128129
hadoopConf = hadoopConf,
129130
partitionColumns = partitionColumns,
130131
bucketSpec = None,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
148148
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
149149
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
150150
if DDLUtils.isHiveTable(r.tableMeta) =>
151-
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
151+
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
152+
ifPartitionNotExists, query.output)
152153

153154
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
154155
DDLUtils.checkDataColNames(tableDesc)
@@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
163164
val outputPath = new Path(storage.locationUri.get)
164165
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
165166

166-
InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
167+
InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output)
167168
}
168169
}
169170

0 commit comments

Comments
 (0)