diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 16c1103dd1ea3..667101092d9dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -117,22 +117,34 @@ class HadoopTableReader( val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) - // logDebug("Table input: %s".format(tablePath)) - val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) - - val attrsWithIndex = attributes.zipWithIndex - val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) - - val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHadoopConf.value.value - val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, localTableDesc.getProperties) - HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) - } + val locationPath = new Path(inputPathStr) + val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value) + + // if the location of the table which is not created by 'stored by' does not exist, + // return an empty RDD + // TODO: after SparkSQL supports 'stored by', we should check if this condition + // is still proper. + val storageHandler = hiveTable.getParameters.getOrDefault(META_TABLE_STORAGE, null) + if (storageHandler == null && !fs.exists(locationPath)) { + new EmptyRDD[InternalRow](sparkSession.sparkContext) + } else { + // logDebug("Table input: %s".format(tablePath)) + val ifc = hiveTable.getInputFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) - deserializedHadoopRDD + val attrsWithIndex = attributes.zipWithIndex + val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) + + val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => + val hconf = broadcastedHadoopConf.value.value + val deserializer = deserializerClass.newInstance() + deserializer.initialize(hconf, localTableDesc.getProperties) + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) + } + + deserializedHadoopRDD + } } override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 81ae5b7bdb672..266146bcba29b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.File +import java.net.URI import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -35,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -63,6 +65,12 @@ class HiveDDLSuite fs.exists(filesystemPath) } + private def makeQualifiedPath(path: String): Path = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration) + fs.makeQualified(hadoopPath) + } + test("drop tables") { withTable("tab1") { val tabName = "tab1" @@ -1588,6 +1596,147 @@ class HiveDDLSuite } } + test("insert data to a hive serde table which has a non-existing location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING hive + |LOCATION '$dir' + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) + + val tableLocFile = new File(new URI(table.location)) + tableLocFile.delete() + assert(!tableLocFile.exists()) + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + assert(tableLocFile.exists()) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + Utils.deleteRecursively(dir) + assert(!tableLocFile.exists()) + spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") + assert(tableLocFile.exists()) + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + + val newDirFile = new File(dir, "x") + val newDirPath = newDirFile.getAbsolutePath + spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") + spark.sessionState.catalog.refreshTable(TableIdentifier("t")) + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDirPath) + assert(!newDirFile.exists()) + + spark.sql("INSERT INTO TABLE t SELECT 'c', 1") + checkAnswer(spark.table("t"), Row("c", 1) :: Nil) + assert(newDirFile.exists()) + } + } + } + + test("insert into a hive serde table with non-existing partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val partLoc = new File(dir, "a=1") + Utils.deleteRecursively(partLoc) + assert(!partLoc.exists()) + // insert overwrite into a partition which location has been deleted. + spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8") + assert(partLoc.exists()) + checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil) + + val newDirFile = new File(dir, "x") + val newDirPath = newDirFile.getAbsolutePath + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'") + assert(!newDirFile.exists()) + + // insert into a partition which location does not exists. + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10") + assert(newDirFile.exists()) + checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil) + } + } + } + + test("read data from a hive serde table which has a non-existing location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a string, b int) + |USING hive + |LOCATION "$dir" + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) + + dir.delete() + checkAnswer(spark.table("t"), Nil) + + val newDirFile = new File(dir, "x") + val newDirPath = newDirFile.getAbsolutePath + spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'") + + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table1.location == newDirPath) + assert(!newDirFile.exists()) + checkAnswer(spark.table("t"), Nil) + } + } + } + + test("read data from a hive serde table with non-existing partition location should succeed") { + withTable("t") { + withTempDir { dir => + spark.sql( + s""" + |CREATE TABLE t(a int, b int, c int, d int) + |USING hive + |PARTITIONED BY(a, b) + |LOCATION "$dir" + """.stripMargin) + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") + checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) + + val newDirFile = new File(dir, "x") + val newDirPath = newDirFile.getAbsolutePath + spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'") + assert(!newDirFile.exists()) + // select from a partition which location has changed to a not existed location + withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + + spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6") + checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil) + assert(newDirFile.exists()) + + // select from a partition which location has been deleted. + Utils.deleteRecursively(newDirFile) + assert(!newDirFile.exists()) + withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") { + checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil) + } + } + } + } + Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existent" else "existed" test(s"CTAS for external data source table with a $tcName location") { @@ -1651,10 +1800,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1672,10 +1819,8 @@ class HiveDDLSuite |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) - val dirPath = new Path(dir.getAbsolutePath) - val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(new Path(table.location) == fs.makeQualified(dirPath)) + assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath)) val partDir = new File(dir, "a=3") assert(partDir.exists())