From 5ec7dd6987ff2cdbadda0eb45f6fdd8aacaf92fd Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 22 Jan 2017 17:19:16 +0800 Subject: [PATCH 01/13] [SPARK-19329][SQL]insert data to a not exist location datasource table should success --- .../datasources/DataSourceStrategy.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21b07ee85adc8..65f780e1588b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -240,7 +240,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) - LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(false), catalogTable = Some(table)) } }) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 41917ccabca58..b4c85ccecadd7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1431,4 +1431,27 @@ class HiveDDLSuite } } } + + test("insert data to a table which has altered the table location " + + "to a not exist location should success") { + withTable("t", "t1") { + withTempDir { dir => + spark.sql( + s"""create table t(a string, b int) + |using parquet + |options(path "${dir.getAbsolutePath}") + """.stripMargin) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == dir.getAbsolutePath) + + var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"alter table t set location '$newDir'") + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == newDir) + + spark.sql("insert into table t select 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } } From 21966484c9b44ca7d509ee017a10175b48300283 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 22 Jan 2017 22:05:50 +0800 Subject: [PATCH 02/13] modify a test name desc --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index b4c85ccecadd7..06c93e59c180c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1433,7 +1433,7 @@ class HiveDDLSuite } test("insert data to a table which has altered the table location " + - "to a not exist location should success") { + "to an not exist location should success") { withTable("t", "t1") { withTempDir { dir => spark.sql( From abc57ddedde78cfd8e94125416423cbcd4e56f71 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 24 Jan 2017 09:46:13 +0800 Subject: [PATCH 03/13] add a param name --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 3 ++- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 65f780e1588b6..0290ceb6e7073 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -240,7 +240,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) - LogicalRelation(dataSource.resolveRelation(false), catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), + catalogTable = Some(table)) } }) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 06c93e59c180c..41fe58ca8b243 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1434,7 +1434,7 @@ class HiveDDLSuite test("insert data to a table which has altered the table location " + "to an not exist location should success") { - withTable("t", "t1") { + withTable("t") { withTempDir { dir => spark.sql( s"""create table t(a string, b int) From c3439ffecfcde7ecc06b6dd40e1d085c433eea94 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 13:20:02 +0800 Subject: [PATCH 04/13] add read from an non exist path --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 41fe58ca8b243..d8c14d5d74e2e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1446,8 +1446,11 @@ class HiveDDLSuite var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" spark.sql(s"alter table t set location '$newDir'") + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) spark.sql("insert into table t select 'c', 1") checkAnswer(spark.table("t"), Row("c", 1) :: Nil) From 409f7a4b2c46bc026fea0ad241d1c068d9386aea Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 17:24:17 +0800 Subject: [PATCH 05/13] add more test case --- .../spark/sql/execution/command/ddl.scala | 2 + .../hive/execution/HiveTableScanExec.scala | 11 ++- .../sql/hive/execution/HiveDDLSuite.scala | 79 ++++++++++++++----- 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 82cbb4aa47445..e07259e817ecf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -754,6 +754,8 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself catalog.alterTable(table.withNewStorage(locationUri = Some(location))) } + + catalog.refreshTable(table.identifier) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index def6ef3691333..26d18b142b678 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,13 +20,14 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{EmptyRDD, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -141,6 +142,14 @@ case class HiveTableScanExec( protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. + val locationPath = new Path(relation.catalogTable.location) + val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + + // if the table location is not exists, return an empty RDD + if (!fs.exists(locationPath)) { + return new EmptyRDD[InternalRow](sparkSession.sparkContext) + } + val rdd = if (!relation.hiveQlTable.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(relation.hiveQlTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d8c14d5d74e2e..efeecc55a8a39 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -1432,29 +1433,69 @@ class HiveDDLSuite } } - test("insert data to a table which has altered the table location " + - "to an not exist location should success") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""create table t(a string, b int) - |using parquet - |options(path "${dir.getAbsolutePath}") + Seq("parquet", "hive").foreach { + source => + test(s"insert data to a $source table which has an not existed location should succeed") { + withTable("t") { + withTempDir { dir => + val x = dir.getAbsolutePath + val y = dir.getCanonicalPath + spark.sql( + s"""CREATE TABLE t(a string, b int) + |USING $source + |OPTIONS(path "file:${dir.getCanonicalPath}") + """.stripMargin) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete + assert(!new File(table.location).exists()) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!new File(table.location).exists()) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == newDir) + assert(!new File(newDir).exists()) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } + + test(s"read data from a $source table which has an not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s"""CREATE TABLE t(a string, b int) + |USING $source + |OPTIONS(path "file:${dir.getAbsolutePath}") """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) - var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"alter table t set location '$newDir'") + dir.delete() + checkAnswer(spark.table("t"), Nil) - table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == newDir) - assert(!new File(newDir).exists()) - checkAnswer(spark.table("t"), Nil) + var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") - spark.sql("insert into table t select 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } } - } } } From 542f86ba6d1037153fb9752472f670c3e81fedbf Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 17:27:41 +0800 Subject: [PATCH 06/13] modify a comment --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 26d18b142b678..3b2bc80a59ebe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -140,8 +140,6 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - // Using dummyCallSite, as getCallSite can turn out to be expensive with - // with multiple partitions. val locationPath = new Path(relation.catalogTable.location) val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) @@ -150,6 +148,8 @@ case class HiveTableScanExec( return new EmptyRDD[InternalRow](sparkSession.sparkContext) } + // Using dummyCallSite, as getCallSite can turn out to be expensive with + // with multiple partitions. val rdd = if (!relation.hiveQlTable.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { hadoopReader.makeRDDForTable(relation.hiveQlTable) From 2cbb9d68171bf66c2e1494c3f346b69b04e4b7de Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 17:28:57 +0800 Subject: [PATCH 07/13] remove some test code --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index efeecc55a8a39..a13dd85a615b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1438,8 +1438,6 @@ class HiveDDLSuite test(s"insert data to a $source table which has an not existed location should succeed") { withTable("t") { withTempDir { dir => - val x = dir.getAbsolutePath - val y = dir.getCanonicalPath spark.sql( s"""CREATE TABLE t(a string, b int) |USING $source From c3e87d356cf99e40e853cb3845fe4b93462c457b Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 12 Feb 2017 19:57:02 +0800 Subject: [PATCH 08/13] modify a comment --- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 3b2bc80a59ebe..c6e0855d0c0bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -143,7 +143,7 @@ case class HiveTableScanExec( val locationPath = new Path(relation.catalogTable.location) val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - // if the table location is not exists, return an empty RDD + // if the table location does not exists, return an empty RDD if (!fs.exists(locationPath)) { return new EmptyRDD[InternalRow](sparkSession.sparkContext) } From 5ebd596fbe4cd962460d66791550a3face3c4b2f Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 14:32:38 +0800 Subject: [PATCH 09/13] remove hive serde related and add partition test cases --- .../spark/sql/execution/command/ddl.scala | 1 - .../hive/execution/HiveTableScanExec.scala | 8 - .../sql/hive/execution/HiveDDLSuite.scala | 173 ++++++++++++------ 3 files changed, 119 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e07259e817ecf..1988e7ddae729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -755,7 +755,6 @@ case class AlterTableSetLocationCommand( catalog.alterTable(table.withNewStorage(locationUri = Some(location))) } - catalog.refreshTable(table.identifier) Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index c6e0855d0c0bc..3169baeb62e8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -140,14 +140,6 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - val locationPath = new Path(relation.catalogTable.location) - val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - - // if the table location does not exists, return an empty RDD - if (!fs.exists(locationPath)) { - return new EmptyRDD[InternalRow](sparkSession.sparkContext) - } - // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. val rdd = if (!relation.hiveQlTable.isPartitioned) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a13dd85a615b1..9e6c703667fee 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1433,67 +1433,132 @@ class HiveDDLSuite } } - Seq("parquet", "hive").foreach { - source => - test(s"insert data to a $source table which has an not existed location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a string, b int) - |USING $source - |OPTIONS(path "file:${dir.getCanonicalPath}") + test("insert data to a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s"""CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "file:${dir.getCanonicalPath}") """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) - - dir.delete - assert(!new File(table.location).exists()) - spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - - Utils.deleteRecursively(dir) - assert(!new File(table.location).exists()) - spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - - var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") - - table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == newDir) - assert(!new File(newDir).exists()) - - spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - } - } + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete + assert(!new File(table.location).exists()) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!new File(table.location).exists()) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == newDir) + assert(!new File(newDir).exists()) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } + + test("insert into a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s"""CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + assert(!new File(newDir).exists()) + + // insert into a partition which location does not exists. + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10") + checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil) } + } + } - test(s"read data from a $source table which has an not existed location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a string, b int) - |USING $source - |OPTIONS(path "file:${dir.getAbsolutePath}") + test("read data from a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s"""CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "file:${dir.getAbsolutePath}") """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) - dir.delete() - checkAnswer(spark.table("t"), Nil) + dir.delete() + checkAnswer(spark.table("t"), Nil) - var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") - table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == newDir) - assert(!new File(newDir).exists()) - checkAnswer(spark.table("t"), Nil) - } - } + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s"""CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + assert(!newDirFile.exists()) + // select from a partition which location has changed to a not existed location + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6") + checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil) + // select from a partition which location has been deleted. + Utils.deleteRecursively(newDirFile) + assert(!newDirFile.exists()) + spark.sql("REFRESH TABLE t") + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) } + } } + } From 334e89fe7258ab6a6773d534bee469cda7cd6d0c Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 14:35:15 +0800 Subject: [PATCH 10/13] remove some import --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 1 - .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 1988e7ddae729..82cbb4aa47445 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -754,7 +754,6 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself catalog.alterTable(table.withNewStorage(locationUri = Some(location))) } - Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 3169baeb62e8f..def6ef3691333 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.rdd.{EmptyRDD, RDD} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ From b238e8d34fc08b3f641c610dada582ca3ee2be2b Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 17:53:45 +0800 Subject: [PATCH 11/13] move test case to DDLSuit --- .../sql/execution/command/DDLSuite.scala | 123 +++++++++++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 129 ------------------ 2 files changed, 123 insertions(+), 129 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b4c9e276ece7a..73c619ff7c4d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1816,4 +1816,127 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("insert data to a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "file:${dir.getCanonicalPath}") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete + val tableLocFile = new File(table.location.stripPrefix("file:")) + assert(!tableLocFile.exists) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!tableLocFile.exists) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + assert(tableLocFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!newDirFile.exists) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(newDirFile.exists) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + } + } + } + + test("insert into a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + assert(partLoc.exists()) + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + + // TODO:insert into a partition after alter the partition location by alter command + } + } + } + + test("read data from a data source table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "file:${dir.getAbsolutePath}") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete() + checkAnswer(spark.table("t"), Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a data source table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + // select from a partition which location has been deleted. + Utils.deleteRecursively(dir) + assert(!dir.exists()) + spark.sql("REFRESH TABLE t") + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 9e6c703667fee..3c95dab9b2da3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1432,133 +1432,4 @@ class HiveDDLSuite } } } - - test("insert data to a data source table which has a not existed location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a string, b int) - |USING parquet - |OPTIONS(path "file:${dir.getCanonicalPath}") - """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) - - dir.delete - assert(!new File(table.location).exists()) - spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - - Utils.deleteRecursively(dir) - assert(!new File(table.location).exists()) - spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - - var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") - spark.sessionState.catalog.refreshTable(TableIdentifier("t")) - - table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == newDir) - assert(!new File(newDir).exists()) - - spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - } - } - } - - test("insert into a data source table with no existed partition location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a int, b int, c int, d int) - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" - """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) - - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") - checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - - val partLoc = new File(s"${dir.getAbsolutePath}/a=1") - Utils.deleteRecursively(partLoc) - assert(!partLoc.exists()) - // insert overwrite into a partition which location has been deleted. - spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") - checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) - - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") - assert(!new File(newDir).exists()) - - // insert into a partition which location does not exists. - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10") - checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil) - } - } - } - - test("read data from a data source table which has a not existed location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a string, b int) - |USING parquet - |OPTIONS(path "file:${dir.getAbsolutePath}") - """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - assert(table.location.stripSuffix("/") == expectedPath) - - dir.delete() - checkAnswer(spark.table("t"), Nil) - - var newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") - - table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == newDir) - assert(!new File(newDir).exists()) - checkAnswer(spark.table("t"), Nil) - } - } - } - - test("read data from a data source table with no existed partition location should succeed") { - withTable("t") { - withTempDir { dir => - spark.sql( - s"""CREATE TABLE t(a int, b int, c int, d int) - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" - """.stripMargin) - var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") - checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") - assert(!newDirFile.exists()) - // select from a partition which location has changed to a not existed location - checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) - - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6") - checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil) - // select from a partition which location has been deleted. - Utils.deleteRecursively(newDirFile) - assert(!newDirFile.exists()) - spark.sql("REFRESH TABLE t") - checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) - } - } - } - } From dee844ce73defc68116913569733275a2f1d5529 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 18:05:53 +0800 Subject: [PATCH 12/13] remove an redundant import --- .../scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3c95dab9b2da3..41917ccabca58 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { From 0d947a55a80ecc63eb15092c29b2c44aeeb197e5 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 14 Feb 2017 14:11:31 +0800 Subject: [PATCH 13/13] fix some code style --- .../sql/execution/command/DDLSuite.scala | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 73c619ff7c4d0..d44688ab80137 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1822,12 +1822,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, b int) - |USING parquet - |OPTIONS(path "file:${dir.getCanonicalPath}") + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = dir.getAbsolutePath.stripSuffix("/") assert(table.location.stripSuffix("/") == expectedPath) dir.delete @@ -1864,13 +1864,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a int, b int, c int, d int) - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = dir.getAbsolutePath.stripSuffix("/") assert(table.location.stripSuffix("/") == expectedPath) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") @@ -1883,8 +1883,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") assert(partLoc.exists()) checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) - - // TODO:insert into a partition after alter the partition location by alter command } } } @@ -1894,12 +1892,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, b int) - |USING parquet - |OPTIONS(path "file:${dir.getAbsolutePath}") + |CREATE TABLE t(a string, b int) + |USING parquet + |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = dir.getAbsolutePath.stripSuffix("/") assert(table.location.stripSuffix("/") == expectedPath) dir.delete() @@ -1921,13 +1919,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a int, b int, c int, d int) - |USING parquet - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |CREATE TABLE t(a int, b int, c int, d int) + |USING parquet + |PARTITIONED BY(a, b) + |LOCATION "$dir" """.stripMargin) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)