Skip to content

Commit d8b81f7

Browse files
hvanhovellrxin
authored andcommitted
[SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommand
## What changes were proposed in this pull request? `InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a table and what table it inserts to. This can make debugging these statements problematic. This PR adds table information the `InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into prq select * from range(0, 100000)` now yields the following executed plan: ``` == Physical Plan == ExecutedCommand +- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq, ParquetFormat, <function1>, Map(serialization.format -> 1, path -> file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable( Table: `default`.`prq` Owner: hvanhovell Created: Wed Nov 09 17:42:30 CET 2016 Last Access: Thu Jan 01 01:00:00 CET 1970 Type: MANAGED Schema: [StructField(id,LongType,true)] Provider: parquet Properties: [transient_lastDdlTime=1478709750] Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: [serialization.format=1])) +- Project [id#7L] +- Range (0, 100000, step=1, splits=None) ``` ## How was this patch tested? Added extra checks to the `ParquetMetastoreSuite` Author: Herman van Hovell <[email protected]> Closes #15832 from hvanhovell/SPARK-18370.
1 parent d4028de commit d8b81f7

File tree

4 files changed

+12
-7
lines changed

4 files changed

+12
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ case class DataSource(
424424
_ => Unit, // No existing table needs to be refreshed.
425425
options,
426426
data.logicalPlan,
427-
mode)
427+
mode,
428+
catalogTable)
428429
sparkSession.sessionState.executePlan(plan).toRdd
429430
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
430431
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
162162

163163

164164
case i @ logical.InsertIntoTable(
165-
l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false)
165+
l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false)
166166
if query.resolved && t.schema.asNullable == query.schema.asNullable =>
167167

168168
// Sanity checks
@@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
222222
refreshPartitionsCallback,
223223
t.options,
224224
query,
225-
mode)
225+
mode,
226+
table)
226227

227228
insertCmd
228229
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
2323

2424
import org.apache.spark.internal.io.FileCommitProtocol
2525
import org.apache.spark.sql._
26-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
26+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
2727
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2828
import org.apache.spark.sql.catalyst.expressions.Attribute
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
4141
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
4242
options: Map[String, String],
4343
@transient query: LogicalPlan,
44-
mode: SaveMode)
44+
mode: SaveMode,
45+
catalogTable: Option[CatalogTable])
4546
extends RunnableCommand {
4647

4748
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
307307

308308
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
309309
df.queryExecution.sparkPlan match {
310-
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
310+
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
311+
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
311312
case o => fail("test_insert_parquet should be converted to a " +
312313
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
313314
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
@@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
337338

338339
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
339340
df.queryExecution.sparkPlan match {
340-
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
341+
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
342+
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
341343
case o => fail("test_insert_parquet should be converted to a " +
342344
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
343345
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +

0 commit comments

Comments
 (0)