Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
Expand Down Expand Up @@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

case MemoryPlan(sink, output) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration


/**
* A special `RunnableCommand` which writes data out and updates metrics.
* A special `Command` which writes data out and updates metrics.
*/
trait DataWritingCommand extends RunnableCommand {

trait DataWritingCommand extends Command {
/**
* The input query plan that produces the data to be written.
* IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
* to [[FileFormatWriter]].
*/
def query: LogicalPlan

// We make the input `query` an inner child instead of a child in order to hide it from the
// optimizer. This is because optimizer may not preserve the output schema names' case, and we
// have to keep the original analyzed plan here so that we can pass the corrected schema to the
// writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
// it when writing.
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now shall we define query as a child here?

override final def children: Seq[LogicalPlan] = query :: Nil

override lazy val metrics: Map[String, SQLMetric] = {
// Output columns of the analyzed input query plan
def outputColumns: Seq[Attribute]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputColumns changed from analyzed to optimized. For example:

withTempDir { dir =>
  val path = dir.getCanonicalPath
  val cnt = 30
  val table1Path = s"$path/table1"
  val table3Path = s"$path/table3"
  spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2")
    .write.mode(SaveMode.Overwrite).parquet(table1Path)
  withTable("table1", "table3") {
    spark.sql(
      s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path/'")
    spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " +
      "PARTITIONED BY (COL2) " +
      s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path/'")

    withView("view1") {
      spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20")
      spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1")
      spark.table("table3").show
    }
  }
}
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)


lazy val metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
Expand All @@ -57,4 +59,6 @@ trait DataWritingCommand extends RunnableCommand {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
}

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -87,6 +87,42 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the metrics in RunnableCommand is mainly for use in DataWritingCommand. Now DataWritingCommand is a separate class other than RunnableCommand, do we still need metrics in RunnableCommand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I prefer to keep it. It is used only in ExecutedCommandExec, and in the future if there are new RunnableCommand with metrics, the new command can just override metrics.


/**
* A physical operator that executes the run method of a `DataWritingCommand` and
* saves the result to prevent multiple executions.
*
* @param cmd the `DataWritingCommand` this operator will run.
* @param child the physical plan child ran by the `DataWritingCommand`.
*/
case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
extends SparkPlan {

override lazy val metrics: Map[String, SQLMetric] = cmd.metrics

protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
val rows = cmd.run(sqlContext.sparkSession, child)

rows.map(converter(_).asInstanceOf[InternalRow])
}

override def children: Seq[SparkPlan] = child :: Nil

override def output: Seq[Attribute] = cmd.output

override def nodeName: String = "Execute " + cmd.nodeName

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator

override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
}

/**
* An explain command for users to see how a command will be executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,6 @@ case class DataSource(
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)


// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val partitionAttributes = partitionColumns.map { name =>
data.output.find(a => equality(a.name, name)).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
}
}

val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
Expand All @@ -479,14 +468,15 @@ case class DataSource(
outputPath = outputPath,
staticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionAttributes,
partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
bucketSpec = bucketSpec,
fileFormat = format,
options = options,
query = data,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an AnalysisBarrier here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @cloud-fan wants AnalysisBarrier to be in trait DataWritingCommand to make sure the query is analyzed.
Adding AnalysisBarrier will not be helpful, the query should be analyzed here.

mode = mode,
catalogTable = catalogTable,
fileIndex = fileIndex)
fileIndex = fileIndex,
outputColumns = data.output)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
actualQuery,
mode,
table,
Some(t.location))
Some(t.location),
actualQuery.output)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand All @@ -56,7 +56,9 @@ object FileFormatWriter extends Logging {

/** Describes how output files should be placed in the filesystem. */
case class OutputSpec(
outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String])
outputPath: String,
customPartitionLocations: Map[TablePartitionSpec, String],
outputColumns: Seq[Attribute])

/** A shared job description for all the write tasks. */
private class WriteJobDescription(
Expand Down Expand Up @@ -101,7 +103,7 @@ object FileFormatWriter extends Logging {
*/
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
Expand All @@ -117,11 +119,8 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

// Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
// names' case.
val allColumns = queryExecution.analyzed.output
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = allColumns.filterNot(partitionSet.contains)
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)

val bucketIdExpression = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
Expand All @@ -144,7 +143,7 @@ object FileFormatWriter extends Logging {
uuid = UUID.randomUUID().toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = allColumns,
allColumns = outputSpec.outputColumns,
dataColumns = dataColumns,
partitionColumns = partitionColumns,
bucketIdExpression = bucketIdExpression,
Expand All @@ -160,7 +159,7 @@ object FileFormatWriter extends Logging {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
// the sort order doesn't matter
val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child)
val actualOrdering = plan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
false
} else {
Expand All @@ -178,17 +177,18 @@ object FileFormatWriter extends Logging {

try {
val rdd = if (orderingMatched) {
queryExecution.toRdd
plan.execute()
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = requiredOrdering
.map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
.map(SortOrder(_, Ascending))
.map(BindReferences.bindReference(_, outputSpec.outputColumns))
SortExec(
orderingExpr,
global = false,
child = queryExecution.executedPlan).execute()
child = plan).execute()
}
val ret = new Array[WriteTaskResult](rdd.partitions.length)
sparkSession.sparkContext.runJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.util.SchemaUtils

Expand All @@ -52,11 +53,12 @@ case class InsertIntoHadoopFsRelationCommand(
query: LogicalPlan,
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex])
fileIndex: Option[FileIndex],
outputColumns: Seq[Attribute])
extends DataWritingCommand {
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName

override def run(sparkSession: SparkSession): Seq[Row] = {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
SchemaUtils.checkSchemaColumnNameDuplication(
query.schema,
Expand Down Expand Up @@ -139,11 +141,11 @@ case class InsertIntoHadoopFsRelationCommand(
val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
plan = child,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
qualifiedOutputPath.toString, customPartitionLocations),
qualifiedOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ class FileStreamSink(
throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
}
}
val qe = data.queryExecution

FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = data.queryExecution,
plan = qe.executedPlan,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output)

case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
Expand All @@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)

InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred._

import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl

/**
Expand All @@ -54,9 +56,10 @@ case class InsertIntoHiveDirCommand(
isLocal: Boolean,
storage: CatalogStorageFormat,
query: LogicalPlan,
overwrite: Boolean) extends SaveAsHiveFile {
overwrite: Boolean,
outputColumns: Seq[Attribute]) extends SaveAsHiveFile {

override def run(sparkSession: SparkSession): Seq[Row] = {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(storage.locationUri.nonEmpty)

val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
Expand Down Expand Up @@ -98,10 +101,11 @@ case class InsertIntoHiveDirCommand(
try {
saveAsHiveFile(
sparkSession = sparkSession,
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
plan = child,
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
outputLocation = tmpPath.toString)
outputLocation = tmpPath.toString,
allColumns = outputColumns)

val fs = writeToPath.getFileSystem(hadoopConf)
if (overwrite && fs.exists(writeToPath)) {
Expand Down
Loading