Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,15 @@ private[spark] object Utils extends Logging {
}
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,3 +906,8 @@ case class Deduplicate(

override def output: Seq[Attribute] = child.output
}

/** A logical plan for writing out data file. */
case class WriteDataFileOut(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case w: WriteDataOutCommand => WrittenDataCommandExec(w) :: Nil

case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

case WriteDataFileOut(child) =>
WriteDataFileOutExec(planLater(child)) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,47 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WriteDataFileOut}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
def run(sparkSession: SparkSession): Seq[Row]
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
* A logical command specialized for writing data out. `WriteDataOutCommand`s are
* wrapped in `WriteDataOutCommand` during execution.
*/
case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
trait WriteDataOutCommand extends logical.Command {
// The query plan that represents the data going to write out.
val query: LogicalPlan

def run(sparkSession: SparkSession, queryExecution: QueryExecution,
callback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row]

override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
}

trait CommandExec extends SparkPlan {

val cmd: logical.Command

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand All @@ -53,10 +68,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
protected[sql] val sideEffectResult: Seq[InternalRow]

override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil

Expand All @@ -75,6 +87,110 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
}
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*/
case class ExecutedCommandExec(cmd: RunnableCommand) extends CommandExec {
override protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
}

/**
* A physical operator specialized to execute the run method of a `WriteDataOutCommand` and
* saves the result to prevent multiple executions.
*/
case class WrittenDataCommandExec(cmd: WriteDataOutCommand) extends CommandExec {

private def updateDriverMetrics(
queryExecution: QueryExecution,
writeTaskSummary: Seq[ExecutedWriteSummary]): Unit = {
var partitionNum = 0
var fileNum = 0
var fileBytes: Long = 0L

writeTaskSummary.foreach { summary =>
partitionNum += summary.updatedPartitions.size
fileNum += summary.writtenFileNum
fileBytes += summary.writtenBytes
}

val writeOutPlan = queryExecution.executedPlan.collect {
case w: WriteDataFileOutExec => w
}.head

val partitionMetric = writeOutPlan.metrics("dynamicPartNum")
val fileNumMetric = writeOutPlan.metrics("fileNum")
val fileBytesMetric = writeOutPlan.metrics("fileBytes")
partitionMetric.add(partitionNum)
fileNumMetric.add(fileNum)
fileBytesMetric.add(fileBytes)

val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sqlContext.sparkContext, executionId,
partitionMetric :: fileNumMetric :: fileBytesMetric :: Nil)
}

override protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
// Wraps the query plan with an operator to track its metrics. The commands will actually use
// this wrapped query plan when writing data out.
val writeDataOutQuery: LogicalPlan = WriteDataFileOut(cmd.query)

val queryExecution = Dataset.ofRows(sqlContext.sparkSession, writeDataOutQuery).queryExecution

// Associate the query execution with a SQL execution id
SQLExecution.withNewExecutionId(sqlContext.sparkSession, queryExecution) {
val startTime = System.nanoTime()

val converter = CatalystTypeConverters.createToCatalystConverter(schema)
val results = cmd.run(sqlContext.sparkSession, queryExecution,
updateDriverMetrics(queryExecution, _))
.map(converter(_).asInstanceOf[InternalRow])

val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
val writingTimeMetrics = queryExecution.executedPlan.collect {
case w: WriteDataFileOutExec => w
}.head.metrics("writingTime")
writingTimeMetrics.add(timeTakenMs)

val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sqlContext.sparkContext, executionId,
writingTimeMetrics :: Nil)

results
}
}
}

/**
* A physical operator represents the action of writing out data files. This operator doesn't change
* the data but associates metrics with data writes for visibility.
*/
case class WriteDataFileOutExec(child: SparkPlan) extends SparkPlan {

override def output: Seq[Attribute] = child.output
override def children: Seq[SparkPlan] = child :: Nil

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of output rows"),
"writingTime" -> SQLMetrics.createMetric(sqlContext.sparkContext, "writing data out time (ms)"),
"dynamicPartNum" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of dynamic part"),
"fileNum" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of written files"),
"fileBytes" -> SQLMetrics.createMetric(sqlContext.sparkContext, "bytes of written files"))

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsInternal { iter =>
iter.map{ row =>
numOutputRows += 1
row
}
}
}
}

/**
* An explain command for users to see how a command will be executed.
*
Expand Down
Loading