Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,34 @@ class HadoopTableReader(
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, localTableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value)

// if the location of the table which is not created by 'stored by' does not exist,
// return an empty RDD
// TODO: after SparkSQL supports 'stored by', we should check if this condition
// is still proper.
val storageHandler = hiveTable.getParameters.getOrDefault(META_TABLE_STORAGE, null)
if (storageHandler == null && !fs.exists(locationPath)) {
new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)

deserializedHadoopRDD
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, localTableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
}
}

override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution

import java.io.File
import java.net.URI

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
Expand All @@ -35,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -63,6 +65,12 @@ class HiveDDLSuite
fs.exists(filesystemPath)
}

private def makeQualifiedPath(path: String): Path = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
fs.makeQualified(hadoopPath)
}

test("drop tables") {
withTable("tab1") {
val tabName = "tab1"
Expand Down Expand Up @@ -1588,6 +1596,147 @@ class HiveDDLSuite
}
}

test("insert data to a hive serde table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|LOCATION '$dir'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call dir.delete before creating this table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

Copy link
Contributor Author

@windpiger windpiger Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I found the dir will be created in create table, so we should keep current logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hive have the same behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I test it in Hive

create table test(a string) location 'hdfs:/xx';

then hdfs:/xx will be created

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the InMemoryCatalog doesn't do this, you can send a new PR to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks~

""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath))

val tableLocFile = new File(new URI(table.location))
tableLocFile.delete()
assert(!tableLocFile.exists())
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists())
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists())
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDirPath)
assert(!newDirFile.exists())

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
assert(newDirFile.exists())
}
}
}

test("insert into a hive serde table with non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath))

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(dir, "a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'")
assert(!newDirFile.exists())

// insert into a partition which location does not exists.
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 9, 10")
assert(newDirFile.exists())
checkAnswer(spark.table("t"), Row(9, 10, 1, 2) :: Nil)
}
}
}

test("read data from a hive serde table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING hive
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath))

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t SET LOCATION '$newDirPath'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDirPath)
assert(!newDirFile.exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a hive serde table with non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING hive
|PARTITIONED BY(a, b)
|LOCATION "$dir"
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val newDirFile = new File(dir, "x")
val newDirPath = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t PARTITION(a=1, b=2) SET LOCATION '$newDirPath'")
assert(!newDirFile.exists())
// select from a partition which location has changed to a not existed location
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why setting this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't set it,it will throw an exception,if we set it,it will check if the partition path exists,and will not throw exception just return emptyrdd even if path not existed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected? I think hive will always return empty result right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this conf will be removed soon, as it has bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~thanks~ then here we also need to modify something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hive return empty , if there is a bug here(could you describe what the bug is?), we can remove the conf ,and always return result?

checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 5, 6")
checkAnswer(spark.table("t"), Row(5, 6, 1, 2) :: Nil)
assert(newDirFile.exists())

// select from a partition which location has been deleted.
Utils.deleteRecursively(newDirFile)
assert(!newDirFile.exists())
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}

Seq(true, false).foreach { shouldDelete =>
val tcName = if (shouldDelete) "non-existent" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
Expand Down Expand Up @@ -1651,10 +1800,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(new Path(table.location) == fs.makeQualified(dirPath))
assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath))

checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
Expand All @@ -1672,10 +1819,8 @@ class HiveDDLSuite
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val dirPath = new Path(dir.getAbsolutePath)
val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(new Path(table.location) == fs.makeQualified(dirPath))
assert(new Path(table.location) == makeQualifiedPath(dir.getAbsolutePath))

val partDir = new File(dir, "a=3")
assert(partDir.exists())
Expand Down