From 971770955c836bb727378af8268c495a2baa7d1f Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Wed, 20 Dec 2017 15:09:08 +0800 Subject: [PATCH 1/9] SPARK-22834: Make insert commands have real children to fix UI issues --- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../command/DataWritingCommand.scala | 26 ++++------ .../sql/execution/command/commands.scala | 47 ++++++++++++++++++- .../execution/datasources/DataSource.scala | 16 ++----- .../datasources/DataSourceStrategy.scala | 3 +- .../datasources/FileFormatWriter.scala | 23 +++++---- .../InsertIntoHadoopFsRelationCommand.scala | 14 ++++-- .../execution/streaming/FileStreamSink.scala | 5 +- .../spark/sql/hive/HiveStrategies.scala | 5 +- .../execution/InsertIntoHiveDirCommand.scala | 17 +++++-- .../hive/execution/InsertIntoHiveTable.scala | 21 ++++++--- .../sql/hive/execution/SaveAsHiveFile.scala | 10 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 13 +++++ 13 files changed, 135 insertions(+), 68 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8c6c324d456c..36c8ac1eac54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} -import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.{DataWritingCommand, _} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.execution.streaming._ @@ -403,6 +403,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case d: DataWritingCommand => DataWritingCommandExec(d, d.children.map(planLater)) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 2cf06982e25f..67206e133813 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -20,30 +20,18 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration - /** * A special `RunnableCommand` which writes data out and updates metrics. */ -trait DataWritingCommand extends RunnableCommand { - - /** - * The input query plan that produces the data to be written. - */ - def query: LogicalPlan - - // We make the input `query` an inner child instead of a child in order to hide it from the - // optimizer. This is because optimizer may not preserve the output schema names' case, and we - // have to keep the original analyzed plan here so that we can pass the corrected schema to the - // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect - // it when writing. - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil - - override lazy val metrics: Map[String, SQLMetric] = { +trait DataWritingCommand extends Command { + lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), @@ -57,4 +45,8 @@ trait DataWritingCommand extends RunnableCommand { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } + + def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + throw new NotImplementedError + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index e28b5eb2e2a2..3ce401db9f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -87,6 +87,51 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { } } +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + * + * @param cmd the `RunnableCommand` this operator will run. + * @param children the children physical plans ran by the `RunnableCommand`. + */ +case class DataWritingCommandExec(cmd: DataWritingCommand, children: Seq[SparkPlan]) + extends SparkPlan { + + override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + + /** + * A concrete command should override this lazy field to wrap up any side effects caused by the + * command or any other computation that should be evaluated exactly once. The value of this field + * can be used as the contents of the corresponding RDD generated from the physical plan of this + * command. + * + * The `execute()` method of all the physical command classes should reference `sideEffectResult` + * so that the command can be executed eagerly right after the command query is created. + */ + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val rows = cmd.run(sqlContext.sparkSession, children) + + rows.map(converter(_).asInstanceOf[InternalRow]) + } + + override def innerChildren: Seq[QueryPlan[_]] = cmd.children + + override def output: Seq[Attribute] = cmd.output + + override def nodeName: String = "Execute " + cmd.nodeName + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } +} + /** * An explain command for users to see how a command will be executed. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b676672b38cd..a8acdebcf76a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -456,17 +456,6 @@ case class DataSource( val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) - - // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does - // not need to have the query as child, to avoid to analyze an optimized query, - // because InsertIntoHadoopFsRelationCommand will be optimized first. - val partitionAttributes = partitionColumns.map { name => - data.output.find(a => equality(a.name, name)).getOrElse { - throw new AnalysisException( - s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]") - } - } - val fileIndex = catalogTable.map(_.identifier).map { tableIdent => sparkSession.table(tableIdent).queryExecution.analyzed.collect { case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location @@ -479,14 +468,15 @@ case class DataSource( outputPath = outputPath, staticPartitions = Map.empty, ifPartitionNotExists = false, - partitionColumns = partitionAttributes, + partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted), bucketSpec = bucketSpec, fileFormat = format, options = options, query = data, mode = mode, catalogTable = catalogTable, - fileIndex = fileIndex) + fileIndex = fileIndex, + allColumns = data.output) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 400f2e03165b..d94c5bbccdd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -208,7 +208,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast actualQuery, mode, table, - Some(t.location)) + Some(t.location), + actualQuery.output) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 1fac01a2c26c..7ad3c7770f08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution} +import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} import org.apache.spark.sql.types.StringType import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -56,7 +56,9 @@ object FileFormatWriter extends Logging { /** Describes how output files should be placed in the filesystem. */ case class OutputSpec( - outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String]) + outputPath: String, + customPartitionLocations: Map[TablePartitionSpec, String], + allColumns: Seq[Attribute]) /** A shared job description for all the write tasks. */ private class WriteJobDescription( @@ -101,7 +103,7 @@ object FileFormatWriter extends Logging { */ def write( sparkSession: SparkSession, - queryExecution: QueryExecution, + plan: SparkPlan, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, @@ -117,11 +119,8 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) - // Pick the attributes from analyzed plan, as optimizer may not preserve the output schema - // names' case. - val allColumns = queryExecution.analyzed.output val partitionSet = AttributeSet(partitionColumns) - val dataColumns = allColumns.filterNot(partitionSet.contains) + val dataColumns = outputSpec.allColumns.filterNot(partitionSet.contains) val bucketIdExpression = bucketSpec.map { spec => val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) @@ -144,7 +143,7 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = allColumns, + allColumns = outputSpec.allColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, bucketIdExpression = bucketIdExpression, @@ -160,7 +159,7 @@ object FileFormatWriter extends Logging { // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns // the sort order doesn't matter - val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) + val actualOrdering = plan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { false } else { @@ -178,17 +177,17 @@ object FileFormatWriter extends Logging { try { val rdd = if (orderingMatched) { - queryExecution.toRdd + plan.execute() } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. val orderingExpr = requiredOrdering - .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns)) + .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, outputSpec.allColumns)) SortExec( orderingExpr, global = false, - child = queryExecution.executedPlan).execute() + child = plan).execute() } val ret = new Array[WriteTaskResult](rdd.partitions.length) sparkSession.sparkContext.runJob( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee85bf61..3f3e62089592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.util.SchemaUtils @@ -52,11 +53,16 @@ case class InsertIntoHadoopFsRelationCommand( query: LogicalPlan, mode: SaveMode, catalogTable: Option[CatalogTable], - fileIndex: Option[FileIndex]) + fileIndex: Option[FileIndex], + allColumns: Seq[Attribute]) extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override def run(sparkSession: SparkSession): Seq[Row] = { + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { + assert(physicalChildren.length == 1) + // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkSchemaColumnNameDuplication( query.schema, @@ -139,11 +145,11 @@ case class InsertIntoHadoopFsRelationCommand( val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = physicalChildren.head, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations), + qualifiedOutputPath.toString, customPartitionLocations, allColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 6bd069662200..a7e4e24fb7ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -118,13 +118,14 @@ class FileStreamSink( throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } } + val plan = data.queryExecution.executedPlan FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = data.queryExecution, + plan = plan, fileFormat = fileFormat, committer = committer, - outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), + outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, plan.output), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a7961c757efa..ab857b905572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -148,7 +148,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) + InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, + ifPartitionNotExists, query.output) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataColNames(tableDesc) @@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) - InsertIntoHiveDirCommand(isLocal, storage, child, overwrite) + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 1c6f8dd77fc2..291ba002278b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -27,10 +27,12 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred._ import org.apache.spark.SparkException -import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl /** @@ -54,9 +56,13 @@ case class InsertIntoHiveDirCommand( isLocal: Boolean, storage: CatalogStorageFormat, query: LogicalPlan, - overwrite: Boolean) extends SaveAsHiveFile { + overwrite: Boolean, + allColumns: Seq[Attribute]) extends SaveAsHiveFile { - override def run(sparkSession: SparkSession): Seq[Row] = { + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { + assert(physicalChildren.length == 1) assert(storage.locationUri.nonEmpty) val hiveTable = HiveClientImpl.toHiveTable(CatalogTable( @@ -98,10 +104,11 @@ case class InsertIntoHiveDirCommand( try { saveAsHiveFile( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = physicalChildren.head, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, - outputLocation = tmpPath.toString) + outputLocation = tmpPath.toString, + allColumns = allColumns) val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b46addb6aa85..841c7e99ba2b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -23,10 +23,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -67,14 +68,19 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends SaveAsHiveFile { + ifPartitionNotExists: Boolean, + allColumns: Seq[Attribute]) extends SaveAsHiveFile { + + override def children: Seq[LogicalPlan] = query :: Nil /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. */ - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { + assert(physicalChildren.length == 1) + val externalCatalog = sparkSession.sharedState.externalCatalog val hadoopConf = sparkSession.sessionState.newHadoopConf() @@ -94,7 +100,8 @@ case class InsertIntoHiveTable( val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation) + processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, + physicalChildren) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. @@ -119,7 +126,8 @@ case class InsertIntoHiveTable( externalCatalog: ExternalCatalog, hadoopConf: Configuration, tableDesc: TableDesc, - tmpLocation: Path): Unit = { + tmpLocation: Path, + physicalChildren: Seq[SparkPlan]): Unit = { val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) @@ -191,10 +199,11 @@ case class InsertIntoHiveTable( saveAsHiveFile( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = physicalChildren.head, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpLocation.toString, + allColumns = allColumns, partitionAttributes = partitionAttributes) if (partition.nonEmpty) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 63657590e5e7..9a6607f2f2c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive.HiveExternalCatalog @@ -47,10 +47,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { protected def saveAsHiveFile( sparkSession: SparkSession, - queryExecution: QueryExecution, + plan: SparkPlan, hadoopConf: Configuration, fileSinkConf: FileSinkDesc, outputLocation: String, + allColumns: Seq[Attribute], customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil): Set[String] = { @@ -75,10 +76,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = queryExecution, + plan = plan, fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, - outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations), + outputSpec = + FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns), hadoopConf = hadoopConf, partitionColumns = partitionAttributes, bucketSpec = None, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 07ae3ae94584..63aacc840e31 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1970,6 +1970,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-22252: FileFormatWriter should respect the input query schema in HIVE") { + withTable("t1", "t2", "t3", "t4") { + spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") + spark.sql("select COL1, COL2 from t1").write.format("hive").saveAsTable("t2") + checkAnswer(spark.table("t2"), Row(0, 0)) + + // Test picking part of the columns when writing. + spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") + spark.sql("select COL1, COL2 from t3").write.format("hive").saveAsTable("t4") + checkAnswer(spark.table("t4"), Row(0, 0)) + } + } + test("Auto alias construction of get_json_object") { val df = Seq(("1", """{"f1": "value1", "f5": 5.23}""")).toDF("key", "jstring") val expectedMsg = "Cannot create a table having a column whose name contains commas " + From d49d0f027ee0d7052b71a53038ba0638f770e988 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Wed, 20 Dec 2017 22:57:14 +0800 Subject: [PATCH 2/9] address comments --- .../command/DataWritingCommand.scala | 20 +++++++++++++++---- .../sql/execution/command/commands.scala | 2 +- .../InsertIntoHadoopFsRelationCommand.scala | 2 -- .../execution/InsertIntoHiveDirCommand.scala | 2 -- .../hive/execution/InsertIntoHiveTable.scala | 2 -- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 67206e133813..6ca802e78063 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -21,9 +21,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, Command, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration @@ -31,6 +33,18 @@ import org.apache.spark.util.SerializableConfiguration * A special `RunnableCommand` which writes data out and updates metrics. */ trait DataWritingCommand extends Command { + /** + * The input query plan that produces the data to be written. + * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns + * to [[FileFormatWriter]]. + */ + def query: LogicalPlan + + override def children: Seq[LogicalPlan] = query :: Nil + + // Output columns of the analyzed input query plan + def allColumns: Seq[Attribute] + lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( @@ -46,7 +60,5 @@ trait DataWritingCommand extends Command { new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } - def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { - throw new NotImplementedError - } + def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 3ce401db9f28..5c70b058f135 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -115,7 +115,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, children: Seq[SparkPl rows.map(converter(_).asInstanceOf[InternalRow]) } - override def innerChildren: Seq[QueryPlan[_]] = cmd.children + override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil override def output: Seq[Attribute] = cmd.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 3f3e62089592..5d7f18198e7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -58,8 +58,6 @@ case class InsertIntoHadoopFsRelationCommand( extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override def children: Seq[LogicalPlan] = query :: Nil - override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { assert(physicalChildren.length == 1) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 291ba002278b..b0adfec2e66d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -59,8 +59,6 @@ case class InsertIntoHiveDirCommand( overwrite: Boolean, allColumns: Seq[Attribute]) extends SaveAsHiveFile { - override def children: Seq[LogicalPlan] = query :: Nil - override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { assert(physicalChildren.length == 1) assert(storage.locationUri.nonEmpty) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 841c7e99ba2b..5fe90e0752fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -71,8 +71,6 @@ case class InsertIntoHiveTable( ifPartitionNotExists: Boolean, allColumns: Seq[Attribute]) extends SaveAsHiveFile { - override def children: Seq[LogicalPlan] = query :: Nil - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the From 043cd55758b50a3c1c2cea5b45651960cb76db37 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Thu, 21 Dec 2017 16:30:32 +0800 Subject: [PATCH 3/9] address comments --- .../spark/sql/execution/command/DataWritingCommand.scala | 4 ++-- .../apache/spark/sql/execution/command/commands.scala | 6 +++--- .../spark/sql/execution/datasources/DataSource.scala | 2 +- .../sql/execution/datasources/FileFormatWriter.scala | 9 +++++---- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 4 ++-- .../sql/hive/execution/InsertIntoHiveDirCommand.scala | 4 ++-- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 4 ++-- 7 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 6ca802e78063..96b47ac1b637 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration /** - * A special `RunnableCommand` which writes data out and updates metrics. + * A special `Command` which writes data out and updates metrics. */ trait DataWritingCommand extends Command { /** @@ -43,7 +43,7 @@ trait DataWritingCommand extends Command { override def children: Seq[LogicalPlan] = query :: Nil // Output columns of the analyzed input query plan - def allColumns: Seq[Attribute] + def outputColumns: Seq[Attribute] lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 5c70b058f135..6a76db684d5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -88,11 +88,11 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { } /** - * A physical operator that executes the run method of a `RunnableCommand` and + * A physical operator that executes the run method of a `DataWritingCommand` and * saves the result to prevent multiple executions. * - * @param cmd the `RunnableCommand` this operator will run. - * @param children the children physical plans ran by the `RunnableCommand`. + * @param cmd the `DataWritingCommand` this operator will run. + * @param children the children physical plans ran by the `DataWritingCommand`. */ case class DataWritingCommandExec(cmd: DataWritingCommand, children: Seq[SparkPlan]) extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a8acdebcf76a..25e121050427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -476,7 +476,7 @@ case class DataSource( mode = mode, catalogTable = catalogTable, fileIndex = fileIndex, - allColumns = data.output) + outputColumns = data.output) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 7ad3c7770f08..1d80a69bc5a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -58,7 +58,7 @@ object FileFormatWriter extends Logging { case class OutputSpec( outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String], - allColumns: Seq[Attribute]) + outputColumns: Seq[Attribute]) /** A shared job description for all the write tasks. */ private class WriteJobDescription( @@ -120,7 +120,7 @@ object FileFormatWriter extends Logging { FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) - val dataColumns = outputSpec.allColumns.filterNot(partitionSet.contains) + val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) val bucketIdExpression = bucketSpec.map { spec => val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) @@ -143,7 +143,7 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = outputSpec.allColumns, + allColumns = outputSpec.outputColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, bucketIdExpression = bucketIdExpression, @@ -183,7 +183,8 @@ object FileFormatWriter extends Logging { // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. val orderingExpr = requiredOrdering - .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, outputSpec.allColumns)) + .map(SortOrder(_, Ascending)) + .map(BindReferences.bindReference(_, outputSpec.outputColumns)) SortExec( orderingExpr, global = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 5d7f18198e7c..f34c33449e9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -54,7 +54,7 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], - allColumns: Seq[Attribute]) + outputColumns: Seq[Attribute]) extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName @@ -147,7 +147,7 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations, allColumns), + qualifiedOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index b0adfec2e66d..ec691313d9a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -57,7 +57,7 @@ case class InsertIntoHiveDirCommand( storage: CatalogStorageFormat, query: LogicalPlan, overwrite: Boolean, - allColumns: Seq[Attribute]) extends SaveAsHiveFile { + outputColumns: Seq[Attribute]) extends SaveAsHiveFile { override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { assert(physicalChildren.length == 1) @@ -106,7 +106,7 @@ case class InsertIntoHiveDirCommand( hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpPath.toString, - allColumns = allColumns) + allColumns = outputColumns) val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5fe90e0752fc..43169813ec91 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -69,7 +69,7 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - allColumns: Seq[Attribute]) extends SaveAsHiveFile { + outputColumns: Seq[Attribute]) extends SaveAsHiveFile { /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the @@ -201,7 +201,7 @@ case class InsertIntoHiveTable( hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpLocation.toString, - allColumns = allColumns, + allColumns = outputColumns, partitionAttributes = partitionAttributes) if (partition.nonEmpty) { From cb1043171bc6bfc0f56a1aa182d5c6f0de376b68 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Thu, 21 Dec 2017 23:03:50 +0800 Subject: [PATCH 4/9] address comments --- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../execution/command/DataWritingCommand.scala | 4 ++-- .../spark/sql/execution/command/commands.scala | 17 +++++------------ .../InsertIntoHadoopFsRelationCommand.scala | 6 ++---- .../execution/streaming/FileStreamSink.scala | 6 +++--- .../execution/InsertIntoHiveDirCommand.scala | 5 ++--- .../hive/execution/InsertIntoHiveTable.scala | 11 ++++------- 7 files changed, 19 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 36c8ac1eac54..85ada17d75ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -403,7 +403,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case d: DataWritingCommand => DataWritingCommandExec(d, d.children.map(planLater)) :: Nil + case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 96b47ac1b637..62983daec52b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -40,7 +40,7 @@ trait DataWritingCommand extends Command { */ def query: LogicalPlan - override def children: Seq[LogicalPlan] = query :: Nil + override final def children: Seq[LogicalPlan] = query :: Nil // Output columns of the analyzed input query plan def outputColumns: Seq[Attribute] @@ -60,5 +60,5 @@ trait DataWritingCommand extends Command { new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } - def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] + def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 6a76db684d5c..52f081252d06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -92,29 +92,22 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { * saves the result to prevent multiple executions. * * @param cmd the `DataWritingCommand` this operator will run. - * @param children the children physical plans ran by the `DataWritingCommand`. + * @param child the physical plan child ran by the `DataWritingCommand`. */ -case class DataWritingCommandExec(cmd: DataWritingCommand, children: Seq[SparkPlan]) +case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) extends SparkPlan { override lazy val metrics: Map[String, SQLMetric] = cmd.metrics - /** - * A concrete command should override this lazy field to wrap up any side effects caused by the - * command or any other computation that should be evaluated exactly once. The value of this field - * can be used as the contents of the corresponding RDD generated from the physical plan of this - * command. - * - * The `execute()` method of all the physical command classes should reference `sideEffectResult` - * so that the command can be executed eagerly right after the command query is created. - */ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { val converter = CatalystTypeConverters.createToCatalystConverter(schema) - val rows = cmd.run(sqlContext.sparkSession, children) + val rows = cmd.run(sqlContext.sparkSession, child) rows.map(converter(_).asInstanceOf[InternalRow]) } + override def children: Seq[QueryPlan[_]] = cmd :: Nil + override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil override def output: Seq[Attribute] = cmd.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index f34c33449e9f..ad24e280d942 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -58,9 +58,7 @@ case class InsertIntoHadoopFsRelationCommand( extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { - assert(physicalChildren.length == 1) - + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkSchemaColumnNameDuplication( query.schema, @@ -143,7 +141,7 @@ case class InsertIntoHadoopFsRelationCommand( val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, - plan = physicalChildren.head, + plan = child, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index a7e4e24fb7ea..2715fa93d0e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -118,14 +118,14 @@ class FileStreamSink( throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } } - val plan = data.queryExecution.executedPlan + val qe = data.queryExecution FileFormatWriter.write( sparkSession = sparkSession, - plan = plan, + plan = qe.executedPlan, fileFormat = fileFormat, committer = committer, - outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, plan.output), + outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index ec691313d9a6..cebeca0ce944 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -59,8 +59,7 @@ case class InsertIntoHiveDirCommand( overwrite: Boolean, outputColumns: Seq[Attribute]) extends SaveAsHiveFile { - override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { - assert(physicalChildren.length == 1) + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(storage.locationUri.nonEmpty) val hiveTable = HiveClientImpl.toHiveTable(CatalogTable( @@ -102,7 +101,7 @@ case class InsertIntoHiveDirCommand( try { saveAsHiveFile( sparkSession = sparkSession, - plan = physicalChildren.head, + plan = child, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpPath.toString, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 43169813ec91..3ce5b8469d6f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -76,9 +76,7 @@ case class InsertIntoHiveTable( * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. */ - override def run(sparkSession: SparkSession, physicalChildren: Seq[SparkPlan]): Seq[Row] = { - assert(physicalChildren.length == 1) - + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog val hadoopConf = sparkSession.sessionState.newHadoopConf() @@ -98,8 +96,7 @@ case class InsertIntoHiveTable( val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, - physicalChildren) + processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. @@ -125,7 +122,7 @@ case class InsertIntoHiveTable( hadoopConf: Configuration, tableDesc: TableDesc, tmpLocation: Path, - physicalChildren: Seq[SparkPlan]): Unit = { + child: SparkPlan): Unit = { val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) @@ -197,7 +194,7 @@ case class InsertIntoHiveTable( saveAsHiveFile( sparkSession = sparkSession, - plan = physicalChildren.head, + plan = child, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpLocation.toString, From c1b542f6cfaf52bf6f4d909c54ea2911d31d8d2d Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Fri, 22 Dec 2017 00:11:34 +0800 Subject: [PATCH 5/9] fix DataWritingCommandExec --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 52f081252d06..8a61aac9e01e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -106,7 +106,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) rows.map(converter(_).asInstanceOf[InternalRow]) } - override def children: Seq[QueryPlan[_]] = cmd :: Nil + override def children: Seq[SparkPlan] = child :: Nil override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil From 21b5318cd1a9e0afccbe2f9024f940cec1507496 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Fri, 22 Dec 2017 01:28:19 +0800 Subject: [PATCH 6/9] remove useless code --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 8a61aac9e01e..2cc0e38adc2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -108,8 +108,6 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) override def children: Seq[SparkPlan] = child :: Nil - override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil - override def output: Seq[Attribute] = cmd.output override def nodeName: String = "Execute " + cmd.nodeName From bdffa6d42c7912a4ed63a64e756c7bf66eb0a274 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Fri, 22 Dec 2017 21:08:52 +0800 Subject: [PATCH 7/9] add AnalysisBarrier --- .../spark/sql/execution/command/DataWritingCommand.scala | 2 ++ .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../sql/execution/datasources/DataSourceStrategy.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 7 +++---- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 62983daec52b..02cdf8828234 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -37,6 +37,8 @@ trait DataWritingCommand extends Command { * The input query plan that produces the data to be written. * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns * to [[FileFormatWriter]]. + * For performance consideration, it is suggested to construct [[DataWritingCommand]] + * with its input query as [[AnalysisBarrier]] to avoid the query being analyzed again. */ def query: LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 25e121050427..9471b88689bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider @@ -472,7 +472,7 @@ case class DataSource( bucketSpec = bucketSpec, fileFormat = format, options = options, - query = data, + query = AnalysisBarrier(data), mode = mode, catalogTable = catalogTable, fileIndex = fileIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d94c5bbccdd8..34d60a43f7da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter => _, _} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} @@ -205,7 +205,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast t.bucketSpec, t.fileFormat, t.options, - actualQuery, + AnalysisBarrier(actualQuery), mode, table, Some(t.location), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ab857b905572..692627291619 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,8 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, - ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} @@ -148,7 +147,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, + InsertIntoHiveTable(r.tableMeta, partSpec, AnalysisBarrier(query), overwrite, ifPartitionNotExists, query.output) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => @@ -164,7 +163,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) - InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output) + InsertIntoHiveDirCommand(isLocal, storage, AnalysisBarrier(child), overwrite, child.output) } } From 0e4d2e1d8f8fba45713827712244b669bfdde2a9 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Tue, 26 Dec 2017 15:23:43 +0800 Subject: [PATCH 8/9] Revert "add AnalysisBarrier" This reverts commit 787e6775ec2ccbcfdcd88e9460f14e4d7f658a98. --- .../spark/sql/execution/command/DataWritingCommand.scala | 2 -- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../sql/execution/datasources/DataSourceStrategy.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 7 ++++--- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 02cdf8828234..62983daec52b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -37,8 +37,6 @@ trait DataWritingCommand extends Command { * The input query plan that produces the data to be written. * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns * to [[FileFormatWriter]]. - * For performance consideration, it is suggested to construct [[DataWritingCommand]] - * with its input query as [[AnalysisBarrier]] to avoid the query being analyzed again. */ def query: LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 9471b88689bd..25e121050427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider @@ -472,7 +472,7 @@ case class DataSource( bucketSpec = bucketSpec, fileFormat = format, options = options, - query = AnalysisBarrier(data), + query = data, mode = mode, catalogTable = catalogTable, fileIndex = fileIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 34d60a43f7da..d94c5bbccdd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter => _, _} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} @@ -205,7 +205,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast t.bucketSpec, t.fileFormat, t.options, - AnalysisBarrier(actualQuery), + actualQuery, mode, table, Some(t.location), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 692627291619..ab857b905572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, + ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} @@ -147,7 +148,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - InsertIntoHiveTable(r.tableMeta, partSpec, AnalysisBarrier(query), overwrite, + InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists, query.output) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => @@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) - InsertIntoHiveDirCommand(isLocal, storage, AnalysisBarrier(child), overwrite, child.output) + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output) } } From 18ec01638b9da7f8150e3ea35c2876d6d1f41f3d Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Fri, 29 Dec 2017 11:23:23 +0800 Subject: [PATCH 9/9] add one AnalysisBarrier and address comments --- .../org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../sql/execution/command/DataWritingCommand.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++++++++++++ .../spark/sql/hive/execution/SQLQuerySuite.scala | 13 ------------- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index bd216ffd91eb..3304f368e105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) + options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 85ada17d75ee..6b3f301aad3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} -import org.apache.spark.sql.execution.command.{DataWritingCommand, _} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.execution.streaming._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 62983daec52b..e56f8105fc9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, Command, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.FileFormatWriter diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 6c11905ba890..f2e0c695ca38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2150,4 +2150,17 @@ class HiveDDLSuite assert(e.message.contains("LOAD DATA input path does not exist")) } } + + test("SPARK-22252: FileFormatWriter should respect the input query schema in HIVE") { + withTable("t1", "t2", "t3", "t4") { + spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") + spark.sql("select COL1, COL2 from t1").write.format("hive").saveAsTable("t2") + checkAnswer(spark.table("t2"), Row(0, 0)) + + // Test picking part of the columns when writing. + spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") + spark.sql("select COL1, COL2 from t3").write.format("hive").saveAsTable("t4") + checkAnswer(spark.table("t4"), Row(0, 0)) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 63aacc840e31..07ae3ae94584 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1970,19 +1970,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-22252: FileFormatWriter should respect the input query schema in HIVE") { - withTable("t1", "t2", "t3", "t4") { - spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") - spark.sql("select COL1, COL2 from t1").write.format("hive").saveAsTable("t2") - checkAnswer(spark.table("t2"), Row(0, 0)) - - // Test picking part of the columns when writing. - spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") - spark.sql("select COL1, COL2 from t3").write.format("hive").saveAsTable("t4") - checkAnswer(spark.table("t4"), Row(0, 0)) - } - } - test("Auto alias construction of get_json_object") { val df = Seq(("1", """{"f1": "value1", "f5": 5.23}""")).toDF("key", "jstring") val expectedMsg = "Cannot create a table having a column whose name contains commas " +