From f60e2801ba79eceefcc5c1dc2a9e1a3fbd026534 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 14 May 2017 11:11:40 +0000 Subject: [PATCH 1/3] Add an operator for writing data out. --- .../plans/logical/basicLogicalOperators.scala | 5 + .../spark/sql/execution/SparkStrategies.scala | 5 + .../sql/execution/command/commands.scala | 100 ++++++++++++++++-- .../datasources/FileFormatWriter.scala | 84 +++++++-------- .../InsertIntoHadoopFsRelationCommand.scala | 6 +- .../SaveIntoDataSourceCommand.scala | 8 +- .../CreateHiveTableAsSelectCommand.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 8 +- 8 files changed, 150 insertions(+), 70 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f663d7b8a8f7..223d04b713e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -906,3 +906,8 @@ case class Deduplicate( override def output: Seq[Attribute] = child.output } + +/** A logical plan for writing data out. */ +case class WriteDataOut(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ca2f6dd7a84b..0762c1f200e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -346,8 +346,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case w: WriteDataOutCommand => WrittenDataCommandExec(w) :: Nil + case r: RunnableCommand => ExecutedCommandExec(r) :: Nil + case WriteDataOut(child) => + WriteDataOutExec(planLater(child)) :: Nil + case MemoryPlan(sink, output) => val encoder = RowEncoder(sink.schema) LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 41d91d877d4c..c2be2f3b08d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -18,21 +18,22 @@ package org.apache.spark.sql.execution.command import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WriteDataOut} +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.debug._ +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ /** - * A logical command that is executed for its side-effects. `RunnableCommand`s are + * A logical command that is executed for its side-effects. `RunnableCommand`s are * wrapped in `ExecutedCommand` during execution. */ trait RunnableCommand extends logical.Command { @@ -40,10 +41,24 @@ trait RunnableCommand extends logical.Command { } /** - * A physical operator that executes the run method of a `RunnableCommand` and - * saves the result to prevent multiple executions. + * A logical command specialized for writing data out. `WriteDataOutCommand`s are + * wrapped in `WriteDataOutCommand` during execution. */ -case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { +trait WriteDataOutCommand extends RunnableCommand { + // The query plan that represents the data going to write out. + val query: LogicalPlan + + // Wraps the query plan with an operator to track its metrics. The commands will actually use + // this wrapped query plan when writing data out. + val writeDataOutQuery: LogicalPlan = WriteDataOut(query) + + override protected def innerChildren: Seq[LogicalPlan] = writeDataOutQuery :: Nil +} + +trait CommandExec extends SparkPlan { + + val cmd: RunnableCommand + /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field @@ -53,10 +68,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { * The `execute()` method of all the physical command classes should reference `sideEffectResult` * so that the command can be executed eagerly right after the command query is created. */ - protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { - val converter = CatalystTypeConverters.createToCatalystConverter(schema) - cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) - } + protected[sql] val sideEffectResult: Seq[InternalRow] override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil @@ -75,6 +87,72 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { } } +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + */ +case class ExecutedCommandExec(cmd: RunnableCommand) extends CommandExec { + override protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) + } +} + +/** + * A physical operator specialized to execute the run method of a `WriteDataOutCommand` and + * saves the result to prevent multiple executions. + */ +case class WrittenDataCommandExec(cmd: WriteDataOutCommand) extends CommandExec { + override protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val queryExecution = Dataset.ofRows(sqlContext.sparkSession, cmd.writeDataOutQuery) + .queryExecution + + // Associate the query execution with a SQL execution id + SQLExecution.withNewExecutionId(sqlContext.sparkSession, queryExecution) { + val startTime = System.nanoTime() + + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val results = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) + + val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 + val writingTimeMetrics = queryExecution.executedPlan.collect { + case w: WriteDataOutExec => w + }.head.metrics("writingTime") + writingTimeMetrics.add(timeTakenMs) + + val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sqlContext.sparkContext, executionId, + writingTimeMetrics :: Nil) + + results + } + } +} + +/** + * A physical operator represents the action of writing data out. This operator doesn't change + * the data but associates metrics with data writes for visibility. + */ +case class WriteDataOutExec(child: SparkPlan) extends SparkPlan { + + override def output: Seq[Attribute] = child.output + override def children: Seq[SparkPlan] = child :: Nil + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of output rows"), + "writingTime" -> SQLMetrics.createMetric(sqlContext.sparkContext, "writing data out time (ms)")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + child.execute().mapPartitionsInternal { iter => + iter.map{ row => + numOutputRows += 1 + row + } + } + } +} + /** * An explain command for users to see how a command will be executed. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 4ec09bff429c..01da455a517d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -161,50 +161,48 @@ object FileFormatWriter extends Logging { } } - SQLExecution.withNewExecutionId(sparkSession, queryExecution) { - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - committer.setupJob(job) - - try { - val rdd = if (orderingMatched) { - queryExecution.toRdd - } else { - SortExec( - requiredOrdering.map(SortOrder(_, Ascending)), - global = false, - child = queryExecution.executedPlan).execute() - } - val ret = new Array[WriteTaskResult](rdd.partitions.length) - sparkSession.sparkContext.runJob( - rdd, - (taskContext: TaskContext, iter: Iterator[InternalRow]) => { - executeTask( - description = description, - sparkStageId = taskContext.stageId(), - sparkPartitionId = taskContext.partitionId(), - sparkAttemptNumber = taskContext.attemptNumber(), - committer, - iterator = iter) - }, - 0 until rdd.partitions.length, - (index, res: WriteTaskResult) => { - committer.onTaskCommit(res.commitMsg) - ret(index) = res - }) - - val commitMsgs = ret.map(_.commitMsg) - val updatedPartitions = ret.flatMap(_.updatedPartitions) - .distinct.map(PartitioningUtils.parsePathFragment) - - committer.commitJob(job, commitMsgs) - logInfo(s"Job ${job.getJobID} committed.") - refreshFunction(updatedPartitions) - } catch { case cause: Throwable => - logError(s"Aborting job ${job.getJobID}.", cause) - committer.abortJob(job) - throw new SparkException("Job aborted.", cause) + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + + try { + val rdd = if (orderingMatched) { + queryExecution.toRdd + } else { + SortExec( + requiredOrdering.map(SortOrder(_, Ascending)), + global = false, + child = queryExecution.executedPlan).execute() } + val ret = new Array[WriteTaskResult](rdd.partitions.length) + sparkSession.sparkContext.runJob( + rdd, + (taskContext: TaskContext, iter: Iterator[InternalRow]) => { + executeTask( + description = description, + sparkStageId = taskContext.stageId(), + sparkPartitionId = taskContext.partitionId(), + sparkAttemptNumber = taskContext.attemptNumber(), + committer, + iterator = iter) + }, + 0 until rdd.partitions.length, + (index, res: WriteTaskResult) => { + committer.onTaskCommit(res.commitMsg) + ret(index) = res + }) + + val commitMsgs = ret.map(_.commitMsg) + val updatedPartitions = ret.flatMap(_.updatedPartitions) + .distinct.map(PartitioningUtils.parsePathFragment) + + committer.commitJob(job, commitMsgs) + logInfo(s"Job ${job.getJobID} committed.") + refreshFunction(updatedPartitions) + } catch { case cause: Throwable => + logError(s"Aborting job ${job.getJobID}.", cause) + committer.abortJob(job) + throw new SparkException("Job aborted.", cause) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 19b51d4d9530..ca178b34f897 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -49,12 +49,10 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex]) - extends RunnableCommand { + extends WriteDataOutCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil - override def run(sparkSession: SparkSession): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) { @@ -136,7 +134,7 @@ case class InsertIntoHadoopFsRelationCommand( FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + queryExecution = Dataset.ofRows(sparkSession, writeDataOutQuery).queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 6f19ea195c0c..08bdbafba2c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.WriteDataOutCommand /** * Saves the results of `query` in to a data source. @@ -36,16 +36,14 @@ case class SaveIntoDataSourceCommand( provider: String, partitionColumns: Seq[String], options: Map[String, String], - mode: SaveMode) extends RunnableCommand { - - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + mode: SaveMode) extends WriteDataOutCommand { override def run(sparkSession: SparkSession): Seq[Row] = { DataSource( sparkSession, className = provider, partitionColumns = partitionColumns, - options = options).write(mode, Dataset.ofRows(sparkSession, query)) + options = options).write(mode, Dataset.ofRows(sparkSession, writeDataOutQuery)) Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 41c6b18e9d79..206daf0310c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.WriteDataOutCommand /** @@ -37,7 +37,7 @@ case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, mode: SaveMode) - extends RunnableCommand { + extends WriteDataOutCommand { private val tableIdentifier = tableDesc.identifier diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3682dc850790..ece1610cfe3e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.WriteDataOutCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} @@ -78,9 +78,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) extends RunnableCommand { - - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil + ifNotExists: Boolean) extends WriteDataOutCommand { var createdTempDir: Option[Path] = None @@ -310,7 +308,7 @@ case class InsertIntoHiveTable( FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + queryExecution = Dataset.ofRows(sparkSession, writeDataOutQuery).queryExecution, fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), From eb8553e105f197c46e3e99745b8aa17dac8f1105 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 15 May 2017 08:56:22 +0000 Subject: [PATCH 2/3] Set metrics for file-based relation. --- .../plans/logical/basicLogicalOperators.scala | 4 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../sql/execution/command/commands.scala | 68 +++++++++++++++---- .../datasources/FileFormatWriter.scala | 57 ++++++++++++---- .../InsertIntoHadoopFsRelationCommand.scala | 10 ++- .../SaveIntoDataSourceCommand.scala | 6 +- .../CreateHiveTableAsSelectCommand.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 12 ++-- 8 files changed, 121 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 223d04b713e3..133b1d1275cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -907,7 +907,7 @@ case class Deduplicate( override def output: Seq[Attribute] = child.output } -/** A logical plan for writing data out. */ -case class WriteDataOut(child: LogicalPlan) extends UnaryNode { +/** A logical plan for writing out data file. */ +case class WriteDataFileOut(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0762c1f200e3..5a2fb79f3640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -350,8 +350,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: RunnableCommand => ExecutedCommandExec(r) :: Nil - case WriteDataOut(child) => - WriteDataOutExec(planLater(child)) :: Nil + case WriteDataFileOut(child) => + WriteDataFileOutExec(planLater(child)) :: Nil case MemoryPlan(sink, output) => val encoder = RowEncoder(sink.schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index c2be2f3b08d0..4c92436ae483 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WriteDataOut} -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, WriteDataFileOut} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -44,20 +45,19 @@ trait RunnableCommand extends logical.Command { * A logical command specialized for writing data out. `WriteDataOutCommand`s are * wrapped in `WriteDataOutCommand` during execution. */ -trait WriteDataOutCommand extends RunnableCommand { +trait WriteDataOutCommand extends logical.Command { // The query plan that represents the data going to write out. val query: LogicalPlan - // Wraps the query plan with an operator to track its metrics. The commands will actually use - // this wrapped query plan when writing data out. - val writeDataOutQuery: LogicalPlan = WriteDataOut(query) + def run(sparkSession: SparkSession, queryExecution: QueryExecution, + callback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] - override protected def innerChildren: Seq[LogicalPlan] = writeDataOutQuery :: Nil + override protected def innerChildren: Seq[LogicalPlan] = query :: Nil } trait CommandExec extends SparkPlan { - val cmd: RunnableCommand + val cmd: logical.Command /** * A concrete command should override this lazy field to wrap up any side effects caused by the @@ -103,20 +103,55 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends CommandExec { * saves the result to prevent multiple executions. */ case class WrittenDataCommandExec(cmd: WriteDataOutCommand) extends CommandExec { + + private def updateDriverMetrics( + queryExecution: QueryExecution, + writeTaskSummary: Seq[ExecutedWriteSummary]): Unit = { + var partitionNum = 0 + var fileNum = 0 + var fileBytes: Long = 0L + + writeTaskSummary.foreach { summary => + partitionNum += summary.updatedPartitions.size + fileNum += summary.writtenFileNum + fileBytes += summary.writtenBytes + } + + val writeOutPlan = queryExecution.executedPlan.collect { + case w: WriteDataFileOutExec => w + }.head + + val partitionMetric = writeOutPlan.metrics("dynamicPartNum") + val fileNumMetric = writeOutPlan.metrics("fileNum") + val fileBytesMetric = writeOutPlan.metrics("fileBytes") + partitionMetric.add(partitionNum) + fileNumMetric.add(fileNum) + fileBytesMetric.add(fileBytes) + + val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sqlContext.sparkContext, executionId, + partitionMetric :: fileNumMetric :: fileBytesMetric :: Nil) + } + override protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { - val queryExecution = Dataset.ofRows(sqlContext.sparkSession, cmd.writeDataOutQuery) - .queryExecution + // Wraps the query plan with an operator to track its metrics. The commands will actually use + // this wrapped query plan when writing data out. + val writeDataOutQuery: LogicalPlan = WriteDataFileOut(cmd.query) + + val queryExecution = Dataset.ofRows(sqlContext.sparkSession, writeDataOutQuery).queryExecution // Associate the query execution with a SQL execution id SQLExecution.withNewExecutionId(sqlContext.sparkSession, queryExecution) { val startTime = System.nanoTime() val converter = CatalystTypeConverters.createToCatalystConverter(schema) - val results = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) + val results = cmd.run(sqlContext.sparkSession, queryExecution, + updateDriverMetrics(queryExecution, _)) + .map(converter(_).asInstanceOf[InternalRow]) val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 val writingTimeMetrics = queryExecution.executedPlan.collect { - case w: WriteDataOutExec => w + case w: WriteDataFileOutExec => w }.head.metrics("writingTime") writingTimeMetrics.add(timeTakenMs) @@ -130,17 +165,20 @@ case class WrittenDataCommandExec(cmd: WriteDataOutCommand) extends CommandExec } /** - * A physical operator represents the action of writing data out. This operator doesn't change + * A physical operator represents the action of writing out data files. This operator doesn't change * the data but associates metrics with data writes for visibility. */ -case class WriteDataOutExec(child: SparkPlan) extends SparkPlan { +case class WriteDataFileOutExec(child: SparkPlan) extends SparkPlan { override def output: Seq[Attribute] = child.output override def children: Seq[SparkPlan] = child :: Nil override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of output rows"), - "writingTime" -> SQLMetrics.createMetric(sqlContext.sparkContext, "writing data out time (ms)")) + "writingTime" -> SQLMetrics.createMetric(sqlContext.sparkContext, "writing data out time (ms)"), + "dynamicPartNum" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of dynamic part"), + "fileNum" -> SQLMetrics.createMetric(sqlContext.sparkContext, "number of written files"), + "fileBytes" -> SQLMetrics.createMetric(sqlContext.sparkContext, "bytes of written files")) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 01da455a517d..69e952417a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -22,7 +22,7 @@ import java.util.{Date, UUID} import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -81,7 +81,8 @@ object FileFormatWriter extends Logging { } /** The result of a successful write task. */ - private case class WriteTaskResult(commitMsg: TaskCommitMessage, updatedPartitions: Set[String]) + private case class WriteTaskResult(commitMsg: TaskCommitMessage, + executedSummary: ExecutedWriteSummary) /** * Basic work flow of this command is: @@ -104,7 +105,7 @@ object FileFormatWriter extends Logging { partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], refreshFunction: (Seq[TablePartitionSpec]) => Unit, - options: Map[String, String]): Unit = { + options: Map[String, String]): Seq[ExecutedWriteSummary] = { val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) @@ -193,12 +194,13 @@ object FileFormatWriter extends Logging { }) val commitMsgs = ret.map(_.commitMsg) - val updatedPartitions = ret.flatMap(_.updatedPartitions) + val updatedPartitions = ret.flatMap(_.executedSummary.updatedPartitions) .distinct.map(PartitioningUtils.parsePathFragment) committer.commitJob(job, commitMsgs) logInfo(s"Job ${job.getJobID} committed.") refreshFunction(updatedPartitions) + ret.map(_.executedSummary) } catch { case cause: Throwable => logError(s"Aborting job ${job.getJobID}.", cause) committer.abortJob(job) @@ -272,8 +274,18 @@ object FileFormatWriter extends Logging { * Writes data out to files, and then returns the list of partition strings written out. * The list of partitions is sent back to the driver and used to update the catalog. */ - def execute(iterator: Iterator[InternalRow]): Set[String] + def execute(iterator: Iterator[InternalRow]): ExecutedWriteSummary def releaseResources(): Unit + + protected def getFileSize(conf: Configuration, filePath: String): Long = { + if (filePath != null) { + val path = new Path(filePath) + val fs = path.getFileSystem(conf) + fs.getFileStatus(path).getLen() + } else { + 0L + } + } } /** Writes data to a single directory (used for non-dynamic-partition writes). */ @@ -283,23 +295,25 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol) extends ExecuteWriteTask { private[this] var currentWriter: OutputWriter = _ + private[this] var currentPath: String = _ private def newOutputWriter(fileCounter: Int): Unit = { val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val tmpFilePath = committer.newTaskTempFile( + currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext) currentWriter = description.outputWriterFactory.newInstance( - path = tmpFilePath, + path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) } - override def execute(iter: Iterator[InternalRow]): Set[String] = { + override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = { var fileCounter = 0 var recordsInFile: Long = 0L + var totalFileSize: Long = 0L newOutputWriter(fileCounter) while (iter.hasNext) { if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { @@ -309,6 +323,7 @@ object FileFormatWriter extends Logging { recordsInFile = 0 releaseResources() + totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) newOutputWriter(fileCounter) } @@ -317,7 +332,9 @@ object FileFormatWriter extends Logging { recordsInFile += 1 } releaseResources() - Set.empty + totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) + ExecutedWriteSummary(updatedPartitions = Set.empty, writtenFileNum = fileCounter + 1, + writtenBytes = totalFileSize) } override def releaseResources(): Unit = { @@ -343,6 +360,8 @@ object FileFormatWriter extends Logging { // currentWriter is initialized whenever we see a new key private var currentWriter: OutputWriter = _ + private var currentPath: String = _ + /** Expressions that given partition columns build a path string like: col1=val/col2=val/... */ private def partitionPathExpression: Seq[Expression] = { desc.partitionColumns.zipWithIndex.flatMap { case (c, i) => @@ -398,19 +417,19 @@ object FileFormatWriter extends Logging { case _ => None } - val path = if (customPath.isDefined) { + currentPath = if (customPath.isDefined) { committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } currentWriter = desc.outputWriterFactory.newInstance( - path = path, + path = currentPath, dataSchema = desc.dataColumns.toStructType, context = taskAttemptContext) } - override def execute(iter: Iterator[InternalRow]): Set[String] = { + override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = { val getPartitionColsAndBucketId = UnsafeProjection.create( desc.partitionColumns ++ desc.bucketIdExpression, desc.allColumns) @@ -424,6 +443,8 @@ object FileFormatWriter extends Logging { // If anything below fails, we should abort the task. var recordsInFile: Long = 0L var fileCounter = 0 + var totalFileCounter = 0 + var totalFileSize: Long = 0L var currentPartColsAndBucketId: UnsafeRow = null val updatedPartitions = mutable.Set[String]() for (row <- iter) { @@ -434,9 +455,11 @@ object FileFormatWriter extends Logging { logDebug(s"Writing partition: $currentPartColsAndBucketId") recordsInFile = 0 + totalFileCounter += fileCounter fileCounter = 0 releaseResources() + totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } else if (desc.maxRecordsPerFile > 0 && recordsInFile >= desc.maxRecordsPerFile) { @@ -448,6 +471,7 @@ object FileFormatWriter extends Logging { s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") releaseResources() + totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } @@ -455,7 +479,9 @@ object FileFormatWriter extends Logging { recordsInFile += 1 } releaseResources() - updatedPartitions.toSet + totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) + ExecutedWriteSummary(updatedPartitions = updatedPartitions.toSet, + writtenFileNum = totalFileCounter + 1, writtenBytes = totalFileSize) } override def releaseResources(): Unit = { @@ -469,3 +495,8 @@ object FileFormatWriter extends Logging { } } } + +case class ExecutedWriteSummary( + updatedPartitions: Set[String], + writtenFileNum: Int, + writtenBytes: Long) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index ca178b34f897..f035048dc793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command._ /** @@ -53,7 +54,8 @@ case class InsertIntoHadoopFsRelationCommand( import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, queryExecution: QueryExecution, + callback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) { val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { @@ -132,9 +134,9 @@ case class InsertIntoHadoopFsRelationCommand( } } - FileFormatWriter.write( + val writeSummary = FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, writeDataOutQuery).queryExecution, + queryExecution = queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( @@ -145,6 +147,8 @@ case class InsertIntoHadoopFsRelationCommand( refreshFunction = refreshPartitionsCallback, options = options) + callback(writeSummary) + // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 08bdbafba2c1..57ac42d14b5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.WriteDataOutCommand +import org.apache.spark.sql.execution.command.RunnableCommand /** * Saves the results of `query` in to a data source. @@ -36,14 +36,14 @@ case class SaveIntoDataSourceCommand( provider: String, partitionColumns: Seq[String], options: Map[String, String], - mode: SaveMode) extends WriteDataOutCommand { + mode: SaveMode) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { DataSource( sparkSession, className = provider, partitionColumns = partitionColumns, - options = options).write(mode, Dataset.ofRows(sparkSession, writeDataOutQuery)) + options = options).write(mode, Dataset.ofRows(sparkSession, query)) Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 206daf0310c2..41c6b18e9d79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} -import org.apache.spark.sql.execution.command.WriteDataOutCommand +import org.apache.spark.sql.execution.command.RunnableCommand /** @@ -37,7 +37,7 @@ case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, mode: SaveMode) - extends WriteDataOutCommand { + extends RunnableCommand { private val tableIdentifier = tableDesc.identifier diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ece1610cfe3e..730aabfa2905 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.WriteDataOutCommand -import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, FileFormatWriter} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} @@ -215,7 +216,8 @@ case class InsertIntoHiveTable( * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. */ - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, queryExecution: QueryExecution, + callback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = { val sessionState = sparkSession.sessionState val externalCatalog = sparkSession.sharedState.externalCatalog val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version @@ -306,9 +308,9 @@ case class InsertIntoHiveTable( }.asInstanceOf[Attribute] } - FileFormatWriter.write( + val writeSummary = FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, writeDataOutQuery).queryExecution, + queryExecution = queryExecution, fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), @@ -318,6 +320,8 @@ case class InsertIntoHiveTable( refreshFunction = _ => (), options = Map.empty) + callback(writeSummary) + if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( From 6e50181b769445b928ac0bacbd5ba134277f2550 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 16 May 2017 09:57:13 +0000 Subject: [PATCH 3/3] Add tests for writing data metrics. --- .../scala/org/apache/spark/util/Utils.scala | 9 ++ .../datasources/FileFormatWriter.scala | 9 +- .../SaveIntoDataSourceCommand.scala | 2 + .../execution/metric/SQLMetricsSuite.scala | 110 +++++++++++++++++- .../sql/sources/PartitionedWriteSuite.scala | 21 ++-- 5 files changed, 134 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index edfe22979232..a5eed5ba7d85 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -991,6 +991,15 @@ private[spark] object Utils extends Logging { } } + /** + * Lists files recursively. + */ + def recursiveList(f: File): Array[File] = { + require(f.isDirectory) + val current = f.listFiles + current ++ current.filter(_.isDirectory).flatMap(recursiveList) + } + /** * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 69e952417a70..8ff8c8c62584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -450,12 +450,14 @@ object FileFormatWriter extends Logging { for (row <- iter) { val nextPartColsAndBucketId = getPartitionColsAndBucketId(row) if (currentPartColsAndBucketId != nextPartColsAndBucketId) { + if (currentPartColsAndBucketId != null) { + totalFileCounter += (fileCounter + 1) + } // See a new partition or bucket - write to a new partition dir (or a new bucket file). currentPartColsAndBucketId = nextPartColsAndBucketId.copy() logDebug(s"Writing partition: $currentPartColsAndBucketId") recordsInFile = 0 - totalFileCounter += fileCounter fileCounter = 0 releaseResources() @@ -478,10 +480,13 @@ object FileFormatWriter extends Logging { currentWriter.write(getOutputRow(row)) recordsInFile += 1 } + if (currentPartColsAndBucketId != null) { + totalFileCounter += (fileCounter + 1) + } releaseResources() totalFileSize += getFileSize(taskAttemptContext.getConfiguration, currentPath) ExecutedWriteSummary(updatedPartitions = updatedPartitions.toSet, - writtenFileNum = totalFileCounter + 1, writtenBytes = totalFileSize) + writtenFileNum = totalFileCounter, writtenBytes = totalFileSize) } override def releaseResources(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 57ac42d14b5a..6f19ea195c0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -38,6 +38,8 @@ case class SaveIntoDataSourceCommand( options: Map[String, String], mode: SaveMode) extends RunnableCommand { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def run(sparkSession: SparkSession): Seq[Row] = { DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e544245588f4..026ce322d094 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.{AccumulatorContext, JsonProtocol} +import org.apache.spark.util.{AccumulatorContext, JsonProtocol, Utils} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ @@ -91,6 +91,55 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } + /** + * Run the given function and return latest execution metrics. + * + * @param func the given function to run. + */ + private def getLatestExecutionId(func: () => Unit): Long = { + val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + // Run the given function to trigger query execution. + func() + sparkContext.listenerBus.waitUntilEmpty(10000) + val executionIds = + spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + assert(executionIds.size === 1) + executionIds.head + } + + /** + * Get execution metrics for the given execution id and verify metrics values. + * + * @param executionId the given execution id. + * @param expectedValues expected metrics values. If `None` is given for a metric, only verify + * its existence. + */ + private def verifyWriteDataMetrics(executionId: Long, expectedValues: Seq[Option[Int]]): Unit = { + val executionData = spark.sharedState.listener.getExecution(executionId).get + val writeDataNode = executionData.physicalPlanGraph.nodes.filter { node => + node.name == "WriteDataFileOut" + }.head + + val metricsNames = Seq( + "number of written files", + "number of dynamic part", + "bytes of written files", + "number of output rows", + "writing data out time (ms)") + + val metrics = + spark.sharedState.listener.getExecutionMetrics(executionId) + + metricsNames.zip(expectedValues).foreach { case (metricsName, expectedValue) => + val sqlMetric = writeDataNode.metrics.find(_.name == metricsName).get + val accumulatorId = sqlMetric.accumulatorId + val metricValue = metrics(accumulatorId) + expectedValue.foreach { expected => + assert(metricValue.toInt == expected) + } + } + } + test("LocalTableScanExec computes metrics in collect and take") { val df1 = spark.createDataset(Seq(1, 2, 3)) val logical = df1.queryExecution.logical @@ -288,6 +337,65 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { } } + test("WriteDataFileOutExec metrics") { + withTable("writeToTable") { + val executionId1 = getLatestExecutionId { () => + Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("writeToTable") + } + // written 1 file, 1 row, 0 dynamic partition. + val expectedValues1 = Seq(Some(1), Some(0), None, Some(1), None) + verifyWriteDataMetrics(executionId1, expectedValues1) + + val executionId2 = getLatestExecutionId { () => + Seq((3, 4), (5, 6), (7, 8)).toDF("i", "j").repartition(1) + .write.insertInto("writeToTable") + } + // written 1 file, 3 rows, 0 dynamic partition. + val expectedValues2 = Seq(Some(1), Some(0), None, Some(3), None) + verifyWriteDataMetrics(executionId2, expectedValues2) + + val executionId3 = getLatestExecutionId { () => + Seq((9, 10), (11, 12)).toDF("i", "j").repartition(2) + .write.insertInto("writeToTable") + } + // written 2 files, 2 rows, 0 dynamic partition. + val expectedValues3 = Seq(Some(2), Some(0), None, Some(2), None) + verifyWriteDataMetrics(executionId3, expectedValues3) + } + } + + test("WriteDataFileOutExec metrics: dynamic partition") { + withTempDir { f => + val df = + spark.range(start = 0, end = 4, step = 1, numPartitions = 1).selectExpr("id", "id id1") + val executionId1 = getLatestExecutionId { () => + df + .write + .partitionBy("id") + .option("maxRecordsPerFile", 1) + .mode("overwrite") + .parquet(f.getAbsolutePath) + } + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + // written 4 files, 4 rows, 4 dynamic partitions. + val expectedValues1 = Seq(Some(4), Some(4), None, Some(4), None) + verifyWriteDataMetrics(executionId1, expectedValues1) + + val executionId2 = getLatestExecutionId { () => + df.union(df).repartition(2, $"id") + .write + .partitionBy("id") + .option("maxRecordsPerFile", 2) + .mode("overwrite") + .parquet(f.getAbsolutePath) + } + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + // written 4 files, 8 rows, 4 dynamic partitions. + val expectedValues2 = Seq(Some(4), Some(4), None, Some(8), None) + verifyWriteDataMetrics(executionId2, expectedValues2) + } + } + test("save metrics") { withTempPath { file => val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index a2f3afe3ce23..6f998aa60faf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -91,15 +91,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { withTempDir { f => spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2) spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1) } } @@ -111,7 +111,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { .option("maxRecordsPerFile", 1) .mode("overwrite") .parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) } } @@ -138,14 +138,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { val df = Seq((1, ts)).toDF("i", "ts") withTempPath { f => df.write.partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) checkPartitionValues(files.head, "2016-12-01 00:00:00") } withTempPath { f => df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) // use timeZone option "GMT" to format partition value. checkPartitionValues(files.head, "2016-12-01 08:00:00") @@ -153,18 +153,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { withTempPath { f => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { df.write.partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) // if there isn't timeZone option, then use session local timezone. checkPartitionValues(files.head, "2016-12-01 08:00:00") } } } - - /** Lists files recursively. */ - private def recursiveList(f: File): Array[File] = { - require(f.isDirectory) - val current = f.listFiles - current ++ current.filter(_.isDirectory).flatMap(recursiveList) - } }