From cb983756f7fb270c545f90a98d03e0db3ccc0bd9 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 15:50:55 +0800 Subject: [PATCH 01/14] [SPARK-19575][SQL]Reading from or writing to a hive serde table with a non pre-existing location should succeed --- .../hive/execution/HiveTableScanExec.scala | 12 +- .../sql/hive/execution/HiveDDLSuite.scala | 146 ++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 140c352fa6f8d..e3c656d96028d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,13 +20,14 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{EmptyRDD, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -139,6 +140,15 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { + // Using dummyCallSite, as getCallSite can turn out to be expensive with + // with multiple partitions. + val locationPath = new Path(relation.catalogTable.location) + val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + + // if the table location is not exists, return an empty RDD + if (!fs.exists(locationPath)) { + return new EmptyRDD[InternalRow](sparkSession.sparkContext) + } // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. val rdd = if (!relation.hiveQlTable.isPartitioned) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 9d9f3a620d51b..99de85dbfe68d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -1494,4 +1495,149 @@ class HiveDDLSuite } } } + + test("insert data to a hive serde table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING hive + |OPTIONS(path "file:${dir.getCanonicalPath}") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + val tableLocFile = new File(table.location.stripPrefix("file:")) + tableLocFile.delete() + assert(!tableLocFile.exists()) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(tableLocFile.exists()) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!tableLocFile.exists()) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + assert(tableLocFile.exists()) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!newDirFile.exists()) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + assert(newDirFile.exists()) + } + } + } + + test("insert into a hive serde table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + assert(partLoc.exists()) + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + assert(!newDirFile.exists()) + + // insert into a partition which location does not exists. + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10") + assert(newDirFile.exists()) + checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil) + } + } + } + + test("read data from a hive serde table which has a not existed location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING hive + |OPTIONS(path "file:${dir.getAbsolutePath}") + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + assert(table.location.stripSuffix("/") == expectedPath) + + dir.delete() + checkAnswer(spark.table("t"), Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDir) + assert(!new File(newDir).exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a hive serde table with no existed partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" + val newDirFile = new File(newDir) + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + assert(!newDirFile.exists()) + // select from a partition which location has changed to a not existed location + withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6") + checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil) + assert(newDirFile.exists()) + + // select from a partition which location has been deleted. + Utils.deleteRecursively(newDirFile) + assert(!newDirFile.exists()) + withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + } + } + } } From 401e86d585ea489fd40d0255896e10d8d4e25570 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Feb 2017 19:01:18 +0800 Subject: [PATCH 02/14] remove some comment --- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index e3c656d96028d..b03f215ec9e5b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -140,8 +140,6 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - // Using dummyCallSite, as getCallSite can turn out to be expensive with - // with multiple partitions. val locationPath = new Path(relation.catalogTable.location) val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) From 4493a8f96320720e82dd8a66f61a3b4ebf920116 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 21 Feb 2017 10:16:20 +0800 Subject: [PATCH 03/14] mv the logic to makeRDDForTable --- .../apache/spark/sql/hive/TableReader.scala | 38 ++++++++----- .../hive/execution/HiveTableScanExec.scala | 7 --- .../sql/hive/execution/HiveDDLSuite.scala | 55 +++++++++---------- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b4b63032ab261..38d4b78084da3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -114,22 +114,30 @@ class HadoopTableReader( val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) - // logDebug("Table input: %s".format(tablePath)) - val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) - - val attrsWithIndex = attributes.zipWithIndex - val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) - - val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHadoopConf.value.value - val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) - HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) - } + val locationPath = new Path(inputPathStr) + val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + + // if the table location is not exists, return an empty RDD + if (!fs.exists(locationPath)) { + new EmptyRDD[InternalRow](sparkSession.sparkContext) + } else { + // logDebug("Table input: %s".format(tablePath)) + val ifc = hiveTable.getInputFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) - deserializedHadoopRDD + val attrsWithIndex = attributes.zipWithIndex + val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) + + val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => + val hconf = broadcastedHadoopConf.value.value + val deserializer = deserializerClass.newInstance() + deserializer.initialize(hconf, tableDesc.getProperties) + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) + } + + deserializedHadoopRDD + } } override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b03f215ec9e5b..cc3949f57a78e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -140,13 +140,6 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - val locationPath = new Path(relation.catalogTable.location) - val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - - // if the table location is not exists, return an empty RDD - if (!fs.exists(locationPath)) { - return new EmptyRDD[InternalRow](sparkSession.sparkContext) - } // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. val rdd = if (!relation.hiveQlTable.isPartitioned) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 99de85dbfe68d..0572fd0acf4de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1501,9 +1501,9 @@ class HiveDDLSuite withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, b int) - |USING hive - |OPTIONS(path "file:${dir.getCanonicalPath}") + |CREATE TABLE t(a string, b int) + |USING hive + |OPTIONS(path "file:${dir.getCanonicalPath}") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" @@ -1522,13 +1522,12 @@ class HiveDDLSuite assert(tableLocFile.exists()) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + val newDirFile = new File(dir, "x") + spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location == newDir) + assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/")) assert(!newDirFile.exists()) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") @@ -1543,10 +1542,10 @@ class HiveDDLSuite withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a int, b int, c int, d int) - |USING hive - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" @@ -1563,9 +1562,9 @@ class HiveDDLSuite assert(partLoc.exists()) checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + val newDirFile = new File(dir, "x") + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " + + s"'${newDirFile.getAbsolutePath}'") assert(!newDirFile.exists()) // insert into a partition which location does not exists. @@ -1581,9 +1580,9 @@ class HiveDDLSuite withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a string, b int) - |USING hive - |OPTIONS(path "file:${dir.getAbsolutePath}") + |CREATE TABLE t(a string, b int) + |USING hive + |OPTIONS(path "file:${dir.getAbsolutePath}") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" @@ -1592,12 +1591,12 @@ class HiveDDLSuite dir.delete() checkAnswer(spark.table("t"), Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") + val newDirFile = new File(dir, "x") + spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location == newDir) - assert(!new File(newDir).exists()) + assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/")) + assert(!newDirFile.exists()) checkAnswer(spark.table("t"), Nil) } } @@ -1608,19 +1607,19 @@ class HiveDDLSuite withTempDir { dir => spark.sql( s""" - |CREATE TABLE t(a int, b int, c int, d int) - |USING hive - |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "file:${dir.getCanonicalPath}" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x" - val newDirFile = new File(newDir) - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDir'") + val newDirFile = new File(dir, "x") + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " + + s"'${newDirFile.getAbsolutePath}'") assert(!newDirFile.exists()) // select from a partition which location has changed to a not existed location withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { From 6fb2b57fc2c43b1ad61c77e2b5017cbb5a0af386 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 21 Feb 2017 11:29:09 +0800 Subject: [PATCH 04/14] remove redundant import --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index cc3949f57a78e..140c352fa6f8d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.rdd.{EmptyRDD, RDD} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ From b4caca761c9caffc39324b01b9e1aeecf2cc69fe Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 21 Feb 2017 11:33:13 +0800 Subject: [PATCH 05/14] fix a comment --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 38d4b78084da3..4547e135b2ae4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -117,7 +117,7 @@ class HadoopTableReader( val locationPath = new Path(inputPathStr) val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - // if the table location is not exists, return an empty RDD + // if the table location does not exist, return an empty RDD if (!fs.exists(locationPath)) { new EmptyRDD[InternalRow](sparkSession.sparkContext) } else { From 119fa64b42de98b8e242586e5e218bde907d5a54 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 14:52:56 +0800 Subject: [PATCH 06/14] modify some test case code --- .../apache/spark/sql/hive/TableReader.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 35 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 4547e135b2ae4..f1fb9bab83633 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -115,7 +115,7 @@ class HadoopTableReader( val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) val locationPath = new Path(inputPathStr) - val fs = locationPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value) // if the table location does not exist, return an empty RDD if (!fs.exists(locationPath)) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0572fd0acf4de..6d3b10f10f373 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1499,14 +1499,15 @@ class HiveDDLSuite test("insert data to a hive serde table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => + val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING hive - |OPTIONS(path "file:${dir.getCanonicalPath}") + |OPTIONS(path "file:$dirPath") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = s"file:$dirPath" assert(table.location.stripSuffix("/") == expectedPath) val tableLocFile = new File(table.location.stripPrefix("file:")) @@ -1523,11 +1524,12 @@ class HiveDDLSuite checkAnswer(spark.table("t"), Row("c", 1) :: Nil) val newDirFile = new File(dir, "x") - spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'") + val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/")) + assert(table1.location.stripSuffix("/") == newDirPath) assert(!newDirFile.exists()) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") @@ -1540,21 +1542,22 @@ class HiveDDLSuite test("insert into a hive serde table with no existed partition location should succeed") { withTable("t") { withTempDir { dir => + val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a int, b int, c int, d int) |USING hive |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |LOCATION "file:$dirPath" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = s"file:$dirPath" assert(table.location.stripSuffix("/") == expectedPath) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - val partLoc = new File(s"${dir.getAbsolutePath}/a=1") + val partLoc = new File(s"$dirPath/a=1") Utils.deleteRecursively(partLoc) assert(!partLoc.exists()) // insert overwrite into a partition which location has been deleted. @@ -1563,8 +1566,8 @@ class HiveDDLSuite checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) val newDirFile = new File(dir, "x") - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " + - s"'${newDirFile.getAbsolutePath}'") + val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'") assert(!newDirFile.exists()) // insert into a partition which location does not exists. @@ -1578,6 +1581,7 @@ class HiveDDLSuite test("read data from a hive serde table which has a not existed location should succeed") { withTable("t") { withTempDir { dir => + val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) @@ -1585,17 +1589,18 @@ class HiveDDLSuite |OPTIONS(path "file:${dir.getAbsolutePath}") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" + val expectedPath = s"file:$dirPath" assert(table.location.stripSuffix("/") == expectedPath) dir.delete() checkAnswer(spark.table("t"), Nil) val newDirFile = new File(dir, "x") - spark.sql(s"ALTER TABLE t SET LOCATION '${newDirFile.getAbsolutePath}'") + val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location.stripSuffix("/") == newDirFile.getAbsolutePath.stripSuffix("/")) + assert(table1.location.stripSuffix("/") == newDirPath) assert(!newDirFile.exists()) checkAnswer(spark.table("t"), Nil) } @@ -1612,14 +1617,12 @@ class HiveDDLSuite |PARTITIONED BY(a, b) |LOCATION "file:${dir.getCanonicalPath}" """.stripMargin) - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) val newDirFile = new File(dir, "x") - spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION " + - s"'${newDirFile.getAbsolutePath}'") + val newDirPath = newDirFile.getAbsolutePath + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'") assert(!newDirFile.exists()) // select from a partition which location has changed to a not existed location withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { From 3870e19b01f504bf56f0db6834e75f98fff4f8c2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 13:59:29 +0800 Subject: [PATCH 07/14] fix when the table is created by 'stored by' --- .../scala/org/apache/spark/sql/hive/TableReader.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index f1fb9bab83633..83e774c531734 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -83,6 +83,8 @@ class HadoopTableReader( private val _broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + private lazy val STORAGE_HANDLER_KEY = "storage_handler" + override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, @@ -117,8 +119,12 @@ class HadoopTableReader( val locationPath = new Path(inputPathStr) val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value) - // if the table location does not exist, return an empty RDD - if (!fs.exists(locationPath)) { + // if the location of the table which is not created by 'stored by' does not exist, + // return an empty RDD + // TODO: after SparkSQL supports 'stored by', we should check if this condition + // is still proper. + val storageHandler = hiveTable.getParameters.getOrDefault(STORAGE_HANDLER_KEY, null) + if (storageHandler == null && !fs.exists(locationPath)) { new EmptyRDD[InternalRow](sparkSession.sparkContext) } else { // logDebug("Table input: %s".format(tablePath)) From 92d10679b5a07b34f6d5cfdb8cd27279165c95e3 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 14:49:14 +0800 Subject: [PATCH 08/14] replace the constant of STORAGE_HADNLER --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 83e774c531734..867e64a3f7bdd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -83,8 +83,6 @@ class HadoopTableReader( private val _broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - private lazy val STORAGE_HANDLER_KEY = "storage_handler" - override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, @@ -123,7 +121,7 @@ class HadoopTableReader( // return an empty RDD // TODO: after SparkSQL supports 'stored by', we should check if this condition // is still proper. - val storageHandler = hiveTable.getParameters.getOrDefault(STORAGE_HANDLER_KEY, null) + val storageHandler = hiveTable.getParameters.getOrDefault(META_TABLE_STORAGE, null) if (storageHandler == null && !fs.exists(locationPath)) { new EmptyRDD[InternalRow](sparkSession.sparkContext) } else { From 4f660d2211d597aa64105f925b981d081449c58d Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 22:02:25 +0800 Subject: [PATCH 09/14] modfiy test case --- .../sql/hive/execution/HiveDDLSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 6d3b10f10f373..d32004685a50a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1496,19 +1496,19 @@ class HiveDDLSuite } } - test("insert data to a hive serde table which has a not existed location should succeed") { + test("insert data to a hive serde table which has a non-existing location should succeed") { withTable("t") { withTempDir { dir => - val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING hive - |OPTIONS(path "file:$dirPath") + |LOCATION '$dir' """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:$dirPath" - assert(table.location.stripSuffix("/") == expectedPath) + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) + assert(new Path(table.location) == fs.makeQualified(dirPath)) val tableLocFile = new File(table.location.stripPrefix("file:")) tableLocFile.delete() @@ -1539,20 +1539,20 @@ class HiveDDLSuite } } - test("insert into a hive serde table with no existed partition location should succeed") { + test("insert into a hive serde table with non-existing partition location should succeed") { withTable("t") { withTempDir { dir => - val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a int, b int, c int, d int) |USING hive |PARTITIONED BY(a, b) - |LOCATION "file:$dirPath" + |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:$dirPath" - assert(table.location.stripSuffix("/") == expectedPath) + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) + assert(new Path(table.location) == fs.makeQualified(dirPath)) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) @@ -1578,19 +1578,19 @@ class HiveDDLSuite } } - test("read data from a hive serde table which has a not existed location should succeed") { + test("read data from a hive serde table which has a non-existing location should succeed") { withTable("t") { withTempDir { dir => - val dirPath = dir.getAbsolutePath.stripSuffix("/") spark.sql( s""" |CREATE TABLE t(a string, b int) |USING hive - |OPTIONS(path "file:${dir.getAbsolutePath}") + |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val expectedPath = s"file:$dirPath" - assert(table.location.stripSuffix("/") == expectedPath) + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) + assert(new Path(table.location) == fs.makeQualified(dirPath)) dir.delete() checkAnswer(spark.table("t"), Nil) @@ -1607,7 +1607,7 @@ class HiveDDLSuite } } - test("read data from a hive serde table with no existed partition location should succeed") { + test("read data from a hive serde table with non-existing partition location should succeed") { withTable("t") { withTempDir { dir => spark.sql( From 7aa43b18183124d8b2c3cc11b0462cf053f16e20 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 22:04:28 +0800 Subject: [PATCH 10/14] modfiy test case --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d32004685a50a..e7698bcc16099 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1615,7 +1615,7 @@ class HiveDDLSuite |CREATE TABLE t(a int, b int, c int, d int) |USING hive |PARTITIONED BY(a, b) - |LOCATION "file:${dir.getCanonicalPath}" + |LOCATION "$dir" """.stripMargin) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) From 2456a940e2d8ffd15a58a91235a6ce6776d53c53 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 12:21:39 +0800 Subject: [PATCH 11/14] fix test failed --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 7e5a5b48b2352..dda67c16c0113 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -111,14 +111,13 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. - val localTableDesc = tableDesc val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) val locationPath = new Path(inputPathStr) - val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value) + val fs = locationPath.getFileSystem(_broadcastedHadoopConf.value.value) // if the location of the table which is not created by 'stored by' does not exist, // return an empty RDD From f4b4d29b4d75266411a8fc7366e5ade6facf516d Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 13:07:14 +0800 Subject: [PATCH 12/14] fix test failed --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index dda67c16c0113..667101092d9dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -111,13 +111,14 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. + val localTableDesc = tableDesc val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) val locationPath = new Path(inputPathStr) - val fs = locationPath.getFileSystem(_broadcastedHadoopConf.value.value) + val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value) // if the location of the table which is not created by 'stored by' does not exist, // return an empty RDD @@ -130,7 +131,7 @@ class HadoopTableReader( // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) @@ -138,7 +139,7 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) + deserializer.initialize(hconf, localTableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } From 3dcd6c69bb91fa5d3f5800bfe8bbd8ea6f73a12e Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 16:25:11 +0800 Subject: [PATCH 13/14] modify some tests --- .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a9943fb0a199d..559e00021d3a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.File +import java.net.URI import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -1603,7 +1604,7 @@ class HiveDDLSuite val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) assert(new Path(table.location) == fs.makeQualified(dirPath)) - val tableLocFile = new File(table.location.stripPrefix("file:")) + val tableLocFile = new File(new URI(table.location)) tableLocFile.delete() assert(!tableLocFile.exists()) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") @@ -1617,12 +1618,12 @@ class HiveDDLSuite checkAnswer(spark.table("t"), Row("c", 1) :: Nil) val newDirFile = new File(dir, "x") - val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + val newDirPath = newDirFile.getAbsolutePath spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location.stripSuffix("/") == newDirPath) + assert(table1.location == newDirPath) assert(!newDirFile.exists()) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") @@ -1659,7 +1660,7 @@ class HiveDDLSuite checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) val newDirFile = new File(dir, "x") - val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + val newDirPath = newDirFile.getAbsolutePath spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'") assert(!newDirFile.exists()) @@ -1689,11 +1690,11 @@ class HiveDDLSuite checkAnswer(spark.table("t"), Nil) val newDirFile = new File(dir, "x") - val newDirPath = newDirFile.getAbsolutePath.stripSuffix("/") + val newDirPath = newDirFile.getAbsolutePath spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location.stripSuffix("/") == newDirPath) + assert(table1.location == newDirPath) assert(!newDirFile.exists()) checkAnswer(spark.table("t"), Nil) } From 15c0a77714eb4ed5221f47d54ed31fcc10a95303 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 15:54:58 +0800 Subject: [PATCH 14/14] add makeQualifiedPath func --- .../sql/hive/execution/HiveDDLSuite.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 5f0b6a53fee7c..266146bcba29b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -65,6 +65,12 @@ class HiveDDLSuite fs.exists(filesystemPath) } + private def makeQualifiedPath(path: String): Path = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration) + fs.makeQualified(hadoopPath) + } + test("drop tables") { withTable("tab1") { val tabName = "tab1" @@ -1600,9 +1606,7 @@ class HiveDDLSuite |LOCATION '$dir' """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) val tableLocFile = new File(new URI(table.location)) tableLocFile.delete() @@ -1644,14 +1648,12 @@ class HiveDDLSuite |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) - val partLoc = new File(s"$dirPath/a=1") + val partLoc = new File(dir, "a=1") Utils.deleteRecursively(partLoc) assert(!partLoc.exists()) // insert overwrite into a partition which location has been deleted. @@ -1682,9 +1684,7 @@ class HiveDDLSuite |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) dir.delete() checkAnswer(spark.table("t"), Nil) @@ -1800,10 +1800,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1821,10 +1819,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) val partDir = new File(dir, "a=3") assert(partDir.exists())