Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.InferSchema
Expand Down Expand Up @@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}

sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
Expand All @@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
// properties should override settings in extraOptions.
this.extraOptions = this.extraOptions ++ properties.asScala
this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()
Expand Down Expand Up @@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
Dataset.ofRows(sparkSession,
sparkSession.sessionState.catalog.lookupRelation(
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
sparkSession.table(tableName)
Copy link
Member

@gatorsmile gatorsmile Jan 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

/**
Expand Down Expand Up @@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

private var userSpecifiedSchema: Option[StructType] = None

private var extraOptions = new scala.collection.mutable.HashMap[String, String]
private val extraOptions = new scala.collection.mutable.HashMap[String, String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def save(): Unit = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}

assertNotBucketed("save")
val dataSource = DataSource(
df.sparkSession,
Expand Down Expand Up @@ -361,10 +366,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}

val catalog = df.sparkSession.sessionState.catalog
val tableExists = catalog.tableExists(tableIdent)
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
Expand All @@ -385,6 +386,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
// Only do the check if the table is a data source table (the relation is a BaseRelation).
// TODO(cloud-fan): also check hive table relation here when we support overwrite mode
// for creating hive tables.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are also facing the same issue in the insertInto(tableIdent: TableIdentifier) API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertInto is different, it generates InsertIntoTable plan instead of CreateTable plan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      Seq((1, 2)).toDF("i", "j").write.format("parquet").saveAsTable(tableName)
      table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)

We captured the exceptions when the format is parquet. Now, when the format is hive, should we do the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrameWriter.insertInto will ignore the specified provider, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we ignore the specified provider, we still respect the actual format of the table. For example, below is the Hive table. We are not blocking it. Should we block it to make them consistent?

      sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")
      val df = sql(s"SELECT key, value FROM $tableName")
      df.write.mode("overwrite").insertInto(tableName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not block it. This generates InsertIntoTable, and it supports hive table. What we should block is saveAsTable with Overwrite mode, which generates CreateTable.

insert overwrite is different from create table with overwrite mode

case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
if (source.toLowerCase == "hive") {
throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
}

val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val tableDesc = CatalogTable(
identifier = tableIdent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,6 @@ class CatalogSuite
assert(e2.message == "Cannot create a file-based external data source table without path")
}

test("createExternalTable should fail if provider is hive") {
val e = intercept[AnalysisException] {
spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String])
}
assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
}

test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
withTable("same_name") {
spark.range(10).write.saveAsTable("same_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("save API - format hive") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val e = intercept[ClassNotFoundException] {
spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path)
}.getMessage
assert(e.contains("Failed to find data source: hive"))
}
}

test("saveAsTable API - format hive") {
val tableName = "tab1"
withTable(tableName) {
val e = intercept[AnalysisException] {
spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName)
}.getMessage
assert(e.contains("Cannot create hive serde table with saveAsTable API"))
}
}

test("create a temp view using hive") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val tableName = "tab1"
withTable (tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType

class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
Expand Down Expand Up @@ -1289,4 +1290,80 @@ class HiveDDLSuite
}
}
}

test("create hive serde table with Catalog") {
withTable("t") {
withTempDir { dir =>
val df = spark.catalog.createExternalTable(
"t",
"hive",
new StructType().add("i", "int"),
Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If path is not provided, it still works. However, based on our latest design decision, users must provide path when they creating an external table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the design decision, we want to hide the managed/external concept from users. I not sure if we want to rename this API...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, just issue an exception when users do not provide a path? Otherwise, we have to add new APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this problem in a follow-up PR, other data source also have this problem, e.g. users can create an external parquet table without path, so this PR doesn't introduce new problems.

assert(df.collect().isEmpty)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
}
}
}

test("create hive serde table with DataFrameWriter.saveAsTable") {
withTable("t", "t2") {
Seq(1 -> "a").toDF("i", "j")
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))

sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)

val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
}
assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
"to create a partitioned table using Hive"))

val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
}
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))

val e3 = intercept[AnalysisException] {
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
}
assert(e3.message.contains(
"CTAS for hive serde tables does not support append or overwrite semantics"))
}
}

test("read/write files with hive data source is not allowed") {
withTempDir { dir =>
val e = intercept[AnalysisException] {
spark.read.format("hive").load(dir.getAbsolutePath)
}
assert(e.message.contains("Hive data source can only be used with tables"))

val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
}
assert(e2.message.contains("Hive data source can only be used with tables"))
}
}
}