From 6074e36866b715954d04acd58ed0db9cdc650eaf Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 13 Sep 2018 19:00:47 +0800 Subject: [PATCH 1/6] Abstract an output path field in trait DataWritingCommand --- .../spark/sql/execution/SparkPlanInfo.scala | 2 ++ .../execution/command/DataWritingCommand.scala | 3 +++ .../spark/sql/execution/command/commands.scala | 16 ++++++++++++++++ .../command/createDataSourceTables.scala | 5 ++++- .../sql/execution/datasources/DataSource.scala | 2 +- .../InsertIntoHadoopFsRelationCommand.scala | 14 ++++++++------ .../spark/sql/execution/SparkPlanSuite.scala | 9 +++++++++ .../CreateHiveTableAsSelectCommand.scala | 4 +++- .../execution/InsertIntoHiveDirCommand.scala | 3 ++- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 ++ 10 files changed, 50 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd16381116..3b4540f304dc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -60,6 +61,7 @@ private[execution] object SparkPlanInfo { // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata + case writing: DataWritingCommandExec => writing.metadata case _ => Map[String, String]() } new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index a1bb5af1ab723..296bbd141d0ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -48,6 +49,8 @@ trait DataWritingCommand extends Command { def outputColumns: Seq[Attribute] = DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) + def outputPath: Option[Path] + lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 2cc0e38adc2ee..0d5482716a7da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -121,6 +121,22 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } + + // Metadata that describes more details of this writing. + lazy val metadata: Map[String, String] = { + def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") + val outputPath = cmd.outputPath match { + case Some(path) if path != null => path.toString + case _ => "" + } + val columnNames = cmd.outputColumnNames + val metadata = + Map( + "OutputColumnNames" -> seqToString(columnNames), + "OutputPath" -> outputPath + ) + metadata + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b2e1f530b5328..3587ef602bae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution.command import java.net.URI +import org.apache.hadoop.fs.Path + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources._ @@ -221,4 +222,6 @@ case class CreateDataSourceTableAsSelectCommand( throw ex } } + + override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ce3bc3dd48327..38092b334ccf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -440,7 +440,7 @@ case class DataSource( // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. InsertIntoHadoopFsRelationCommand( - outputPath = outputPath, + outputFsPath = outputPath, staticPartitions = Map.empty, ifPartitionNotExists = false, partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 484942d35c857..93029d0503f42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.util.SchemaUtils * Only valid for static partitions. */ case class InsertIntoHadoopFsRelationCommand( - outputPath: Path, + outputFsPath: Path, staticPartitions: TablePartitionSpec, ifPartitionNotExists: Boolean, partitionColumns: Seq[Attribute], @@ -64,12 +64,12 @@ case class InsertIntoHadoopFsRelationCommand( // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( outputColumnNames, - s"when inserting into $outputPath", + s"when inserting into $outputFsPath", sparkSession.sessionState.conf.caseSensitiveAnalysis) val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) - val fs = outputPath.getFileSystem(hadoopConf) - val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val fs = outputFsPath.getFileSystem(hadoopConf) + val qualifiedOutputPath = outputFsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions && catalogTable.isDefined && @@ -106,7 +106,7 @@ case class InsertIntoHadoopFsRelationCommand( val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, - outputPath = outputPath.toString, + outputPath = outputFsPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) val doInsertion = (mode, pathExists) match { @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand( // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached - sparkSession.catalog.refreshByPath(outputPath.toString) + sparkSession.catalog.refreshByPath(outputFsPath.toString) if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) @@ -261,4 +261,6 @@ case class InsertIntoHadoopFsRelationCommand( } }.toMap } + + override def outputPath: Option[Path] = Some(this.outputFsPath) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 47ff372992b91..4da58e44ee412 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -58,4 +58,13 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) } } + + test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") { + withTable("t") { + sql("CREATE TABLE t(col_I int) USING PARQUET") + val f = sql("INSERT OVERWRITE TABLE t SELECT 1") + assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata + .contains("OutputPath")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index aa573b54a2b62..b336619016531 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.hive.execution +import org.apache.hadoop.fs.Path import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand @@ -100,4 +100,6 @@ case class CreateHiveTableAsSelectCommand( s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } + + override def outputPath: Option[Path] = tableDesc.storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 0c694910b06d4..7193b66e2e1af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -30,7 +30,6 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl @@ -131,5 +130,7 @@ case class InsertIntoHiveDirCommand( Seq.empty[Row] } + + override def outputPath: Option[Path] = storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 0ed464dad91b1..9843da5bd55d4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -264,4 +264,6 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } + + override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_)) } From 4de9f8d98eff9e9c42da20b7cf49768e8a994f2a Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Sat, 15 Sep 2018 00:27:27 +0800 Subject: [PATCH 2/6] code review comments --- .../apache/spark/sql/execution/SparkPlanInfo.scala | 2 +- .../sql/execution/command/DataWritingCommand.scala | 2 +- .../spark/sql/execution/command/commands.scala | 2 +- .../execution/command/createDataSourceTables.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../InsertIntoHadoopFsRelationCommand.scala | 14 +++++++------- .../execution/CreateHiveTableAsSelectCommand.scala | 2 +- .../hive/execution/InsertIntoHiveDirCommand.scala | 2 +- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 9 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 3b4540f304dc9..151400e93ce91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec} +import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 296bbd141d0ab..12bd8b5aa068f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -49,7 +49,7 @@ trait DataWritingCommand extends Command { def outputColumns: Seq[Attribute] = DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) - def outputPath: Option[Path] + def outputDir: Option[Path] lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 0d5482716a7da..b91923f0847a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -125,7 +125,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) // Metadata that describes more details of this writing. lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") - val outputPath = cmd.outputPath match { + val outputPath = cmd.outputDir match { case Some(path) if path != null => path.toString case _ => "" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3587ef602bae6..93f08f664ed1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -223,5 +223,5 @@ case class CreateDataSourceTableAsSelectCommand( } } - override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_)) + override def outputDir: Option[Path] = table.storage.locationUri.map(new Path(_)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 38092b334ccf8..ce3bc3dd48327 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -440,7 +440,7 @@ case class DataSource( // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. InsertIntoHadoopFsRelationCommand( - outputFsPath = outputPath, + outputPath = outputPath, staticPartitions = Map.empty, ifPartitionNotExists = false, partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 93029d0503f42..51e8d06966af6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.util.SchemaUtils * Only valid for static partitions. */ case class InsertIntoHadoopFsRelationCommand( - outputFsPath: Path, + outputPath: Path, staticPartitions: TablePartitionSpec, ifPartitionNotExists: Boolean, partitionColumns: Seq[Attribute], @@ -64,12 +64,12 @@ case class InsertIntoHadoopFsRelationCommand( // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( outputColumnNames, - s"when inserting into $outputFsPath", + s"when inserting into $outputPath", sparkSession.sessionState.conf.caseSensitiveAnalysis) val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) - val fs = outputFsPath.getFileSystem(hadoopConf) - val qualifiedOutputPath = outputFsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val fs = outputPath.getFileSystem(hadoopConf) + val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions && catalogTable.isDefined && @@ -106,7 +106,7 @@ case class InsertIntoHadoopFsRelationCommand( val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, - outputPath = outputFsPath.toString, + outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) val doInsertion = (mode, pathExists) match { @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand( // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached - sparkSession.catalog.refreshByPath(outputFsPath.toString) + sparkSession.catalog.refreshByPath(outputPath.toString) if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) @@ -262,5 +262,5 @@ case class InsertIntoHadoopFsRelationCommand( }.toMap } - override def outputPath: Option[Path] = Some(this.outputFsPath) + override def outputDir: Option[Path] = Some(this.outputPath) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index b336619016531..ec1120c80500d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -101,5 +101,5 @@ case class CreateHiveTableAsSelectCommand( s"InsertIntoHiveTable]" } - override def outputPath: Option[Path] = tableDesc.storage.locationUri.map(new Path(_)) + override def outputDir: Option[Path] = tableDesc.storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 7193b66e2e1af..3e9ee6d1e4e4d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -131,6 +131,6 @@ case class InsertIntoHiveDirCommand( Seq.empty[Row] } - override def outputPath: Option[Path] = storage.locationUri.map(new Path(_)) + override def outputDir: Option[Path] = storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 9843da5bd55d4..e28032a8f63a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -265,5 +265,5 @@ case class InsertIntoHiveTable( } } - override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_)) + override def outputDir: Option[Path] = table.storage.locationUri.map(new Path(_)) } From c0d6c833647a79db49a745f3429edd815f2b61a0 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Sat, 15 Sep 2018 00:31:03 +0800 Subject: [PATCH 3/6] rename OutputPath to OutputDir to avoid conflict --- .../org/apache/spark/sql/execution/command/commands.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/SparkPlanSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index b91923f0847a2..c24fb53e1f05e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -125,7 +125,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) // Metadata that describes more details of this writing. lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") - val outputPath = cmd.outputDir match { + val outputDir = cmd.outputDir match { case Some(path) if path != null => path.toString case _ => "" } @@ -133,7 +133,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) val metadata = Map( "OutputColumnNames" -> seqToString(columnNames), - "OutputPath" -> outputPath + "OutputDir" -> outputDir ) metadata } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 4da58e44ee412..f5e71280731e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -59,12 +59,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") { + test("SPARK-25421 DataWritingCommandExec should contains 'OutputDir' metadata") { withTable("t") { sql("CREATE TABLE t(col_I int) USING PARQUET") val f = sql("INSERT OVERWRITE TABLE t SELECT 1") assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata - .contains("OutputPath")) + .contains("OutputDir")) } } } From 9850250c015a952ca2857f44ae21c8fa79c25d37 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 18 Sep 2018 21:33:16 +0800 Subject: [PATCH 4/6] code review comments --- .../spark/sql/execution/SparkPlanInfo.scala | 52 +++++++++++++++++-- .../command/DataWritingCommand.scala | 3 -- .../sql/execution/command/commands.scala | 16 ------ .../command/createDataSourceTables.scala | 4 -- .../InsertIntoHadoopFsRelationCommand.scala | 2 - .../spark/sql/execution/SparkPlanSuite.scala | 4 +- .../CreateHiveTableAsSelectCommand.scala | 3 -- .../execution/InsertIntoHiveDirCommand.scala | 2 - .../hive/execution/InsertIntoHiveTable.scala | 2 - 9 files changed, 51 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 151400e93ce91..e5d7970932e9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -17,10 +17,15 @@ package org.apache.spark.sql.execution +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec} +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -58,13 +63,54 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } + def makeOutputMetadata( + path: Option[Path], + outputColumnNames: Seq[String]): Map[String, String] = { + val pathString = path match { + case Some(p) if p != null => p.toString + case _ => "" + } + Map("OutputPath" -> pathString, + "OutputColumnNames" -> outputColumnNames.mkString("[", ", ", "]") + ) + } + + def reflectTable(write: DataWritingCommand, className: String, field: String): CatalogTable = { + val tableField = Utils.classForName(className).getDeclaredField(field) + tableField.setAccessible(true) + tableField.get(write).asInstanceOf[CatalogTable] + } + // dump the file scan metadata (e.g file path) to event log - val metadata = plan match { + var metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata - case writing: DataWritingCommandExec => writing.metadata + case DataWritingCommandExec(i: InsertIntoHadoopFsRelationCommand, _) => + makeOutputMetadata(Some(i.outputPath), i.outputColumnNames) + case DataWritingCommandExec(c: CreateDataSourceTableAsSelectCommand, _) => + makeOutputMetadata(c.table.storage.locationUri.map(new Path(_)), c.outputColumnNames) + case DataWritingCommandExec(d: DataWritingCommand, _) + if d.getClass.getCanonicalName == CREATE_HIVE_TABLE_AS_SELECT_COMMAND => + val table = reflectTable(d, CREATE_HIVE_TABLE_AS_SELECT_COMMAND, "tableDesc") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case DataWritingCommandExec(d: DataWritingCommand, _) + if d.getClass.getCanonicalName == INSERT_INTO_HIVE_DIR_COMMAND => + val table = reflectTable(d, INSERT_INTO_HIVE_DIR_COMMAND, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case DataWritingCommandExec(d: DataWritingCommand, _) + if d.getClass.getCanonicalName == INSERT_INTO_HIVE_TABLE => + val table = reflectTable(d, INSERT_INTO_HIVE_TABLE, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) case _ => Map[String, String]() } + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metadata, metrics) } + + private val CREATE_HIVE_TABLE_AS_SELECT_COMMAND = + "org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand" + private val INSERT_INTO_HIVE_DIR_COMMAND = + "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand" + private val INSERT_INTO_HIVE_TABLE = + "org.apache.spark.sql.hive.execution.InsertIntoHiveTable" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 12bd8b5aa068f..a1bb5af1ab723 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -49,8 +48,6 @@ trait DataWritingCommand extends Command { def outputColumns: Seq[Attribute] = DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) - def outputDir: Option[Path] - lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index c24fb53e1f05e..2cc0e38adc2ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -121,22 +121,6 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } - - // Metadata that describes more details of this writing. - lazy val metadata: Map[String, String] = { - def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") - val outputDir = cmd.outputDir match { - case Some(path) if path != null => path.toString - case _ => "" - } - val columnNames = cmd.outputColumnNames - val metadata = - Map( - "OutputColumnNames" -> seqToString(columnNames), - "OutputDir" -> outputDir - ) - metadata - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 93f08f664ed1c..fba428a5e20f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.command import java.net.URI -import org.apache.hadoop.fs.Path - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -222,6 +220,4 @@ case class CreateDataSourceTableAsSelectCommand( throw ex } } - - override def outputDir: Option[Path] = table.storage.locationUri.map(new Path(_)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 51e8d06966af6..484942d35c857 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -261,6 +261,4 @@ case class InsertIntoHadoopFsRelationCommand( } }.toMap } - - override def outputDir: Option[Path] = Some(this.outputPath) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index f5e71280731e7..4da58e44ee412 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -59,12 +59,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-25421 DataWritingCommandExec should contains 'OutputDir' metadata") { + test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") { withTable("t") { sql("CREATE TABLE t(col_I int) USING PARQUET") val f = sql("INSERT OVERWRITE TABLE t SELECT 1") assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata - .contains("OutputDir")) + .contains("OutputPath")) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index ec1120c80500d..060dc2916096f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive.execution -import org.apache.hadoop.fs.Path import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} @@ -100,6 +99,4 @@ case class CreateHiveTableAsSelectCommand( s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } - - override def outputDir: Option[Path] = tableDesc.storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 3e9ee6d1e4e4d..022f349a34aca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -130,7 +130,5 @@ case class InsertIntoHiveDirCommand( Seq.empty[Row] } - - override def outputDir: Option[Path] = storage.locationUri.map(new Path(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index e28032a8f63a6..0ed464dad91b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -264,6 +264,4 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } - - override def outputDir: Option[Path] = table.storage.locationUri.map(new Path(_)) } From adb12077c2384a610545c69b22b40ea512b5a17b Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 18 Sep 2018 21:49:28 +0800 Subject: [PATCH 5/6] refine --- .../spark/sql/execution/SparkPlanInfo.scala | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index e5d7970932e9d..1ec638efb92b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -82,24 +82,26 @@ private[execution] object SparkPlanInfo { } // dump the file scan metadata (e.g file path) to event log - var metadata = plan match { + val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata case DataWritingCommandExec(i: InsertIntoHadoopFsRelationCommand, _) => makeOutputMetadata(Some(i.outputPath), i.outputColumnNames) - case DataWritingCommandExec(c: CreateDataSourceTableAsSelectCommand, _) => - makeOutputMetadata(c.table.storage.locationUri.map(new Path(_)), c.outputColumnNames) - case DataWritingCommandExec(d: DataWritingCommand, _) - if d.getClass.getCanonicalName == CREATE_HIVE_TABLE_AS_SELECT_COMMAND => - val table = reflectTable(d, CREATE_HIVE_TABLE_AS_SELECT_COMMAND, "tableDesc") - makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) - case DataWritingCommandExec(d: DataWritingCommand, _) - if d.getClass.getCanonicalName == INSERT_INTO_HIVE_DIR_COMMAND => - val table = reflectTable(d, INSERT_INTO_HIVE_DIR_COMMAND, "table") - makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) - case DataWritingCommandExec(d: DataWritingCommand, _) - if d.getClass.getCanonicalName == INSERT_INTO_HIVE_TABLE => - val table = reflectTable(d, INSERT_INTO_HIVE_TABLE, "table") - makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case DataWritingCommandExec(d: DataWritingCommand, _) => + d.getClass.getCanonicalName match { + case CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND => + val table = reflectTable(d, CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case CREATE_HIVE_TABLE_AS_SELECT_COMMAND => + val table = reflectTable(d, CREATE_HIVE_TABLE_AS_SELECT_COMMAND, "tableDesc") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case INSERT_INTO_HIVE_DIR_COMMAND => + val table = reflectTable(d, INSERT_INTO_HIVE_DIR_COMMAND, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case INSERT_INTO_HIVE_TABLE => + val table = reflectTable(d, INSERT_INTO_HIVE_TABLE, "table") + makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames) + case _ => Map[String, String]() + } case _ => Map[String, String]() } @@ -107,6 +109,8 @@ private[execution] object SparkPlanInfo { metadata, metrics) } + private val CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND = + "org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand" private val CREATE_HIVE_TABLE_AS_SELECT_COMMAND = "org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand" private val INSERT_INTO_HIVE_DIR_COMMAND = From 26365d62e8882d66b09bf90216b9ec6ec58dc89f Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 21 Sep 2018 16:04:52 +0800 Subject: [PATCH 6/6] trivial --- .../scala/org/apache/spark/sql/execution/SparkPlanInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 1ec638efb92b5..053c58e2e14a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -110,7 +110,7 @@ private[execution] object SparkPlanInfo { } private val CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND = - "org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand" + classOf[CreateDataSourceTableAsSelectCommand].getCanonicalName private val CREATE_HIVE_TABLE_AS_SELECT_COMMAND = "org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand" private val INSERT_INTO_HIVE_DIR_COMMAND =