From 4e06830dbfa41f972ea5e63c656f39ac009f1dbb Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 Jan 2017 23:26:53 +0800 Subject: [PATCH 1/3] support creating hive table with DataFrameWriter and Catalog --- .../apache/spark/sql/DataFrameWriter.scala | 7 +-- .../spark/sql/internal/CatalogImpl.scala | 4 -- .../sql/hive/execution/HiveDDLSuite.scala | 63 +++++++++++++++++++ 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3127ebf67967..bd9f7252aa60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -361,10 +360,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Cannot create hive serde table with saveAsTable API") - } - val catalog = df.sparkSession.sessionState.catalog val tableExists = catalog.tableExists(tableIdent) val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) @@ -385,6 +380,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { // Only do the check if the table is a data source table (the relation is a BaseRelation). + // TODO(cloud-fan): also check hive table relation here when we support overwrite mode + // for creating hive tables. case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 41ed9d71809e..8244b2152c1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { source: String, schema: StructType, options: Map[String, String]): DataFrame = { - if (source.toLowerCase == "hive") { - throw new AnalysisException("Cannot create hive serde table with createExternalTable API.") - } - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) val tableDesc = CatalogTable( identifier = tableIdent, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3ac07d093381..40d593a1adec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -1289,4 +1290,66 @@ class HiveDDLSuite } } } + + test("create hive serde table with Catalog") { + withTable("t") { + withTempDir { dir => + val df = spark.catalog.createExternalTable( + "t", + "hive", + new StructType().add("i", "int"), + Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet")) + assert(df.collect().isEmpty) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + } + } + } + + test("create hive serde table with DataFrameWriter.saveAsTable") { + withTable("t", "t2") { + Seq(1 -> "a").toDF("i", "j") + .write.format("hive").option("fileFormat", "avro").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a")) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) + + sql("INSERT INTO t SELECT 2, 'b'") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) + + val e = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") + } + assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " + + "to create a partitioned table using Hive")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2") + } + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val e3 = intercept[AnalysisException] { + spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t") + } + assert(e3.message.contains( + "CTAS for hive serde tables does not support append or overwrite semantics")) + } + } } From 27d97e56295465bb717da9ff08d2571998adf2b1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Jan 2017 11:47:17 +0800 Subject: [PATCH 2/3] address comments --- .../apache/spark/sql/DataFrameReader.scala | 14 ++++++++----- .../apache/spark/sql/DataFrameWriter.scala | 6 ++++++ .../sql/hive/MetastoreDataSourcesSuite.scala | 20 ------------------- .../sql/hive/execution/HiveDDLSuite.scala | 14 +++++++++++++ 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 365b50dee93c..cd8383617844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.InferSchema @@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "read files of Hive data source directly.") + } + sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, @@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { // properties should override settings in extraOptions. - this.extraOptions = this.extraOptions ++ properties.asScala + this.extraOptions ++= properties.asScala // explicit url and dbtable should override all this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) format("jdbc").load() @@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - Dataset.ofRows(sparkSession, - sparkSession.sessionState.catalog.lookupRelation( - sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))) + sparkSession.table(tableName) } /** @@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private var userSpecifiedSchema: Option[StructType] = None - private var extraOptions = new scala.collection.mutable.HashMap[String, String] + private val extraOptions = new scala.collection.mutable.HashMap[String, String] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index bd9f7252aa60..82331fdb9b01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -204,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def save(): Unit = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "write files of Hive data source directly.") + } + assertNotBucketed("save") val dataSource = DataSource( df.sparkSession, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index aed825e2f3d2..13ef79e3b77d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("save API - format hive") { - withTempDir { dir => - val path = dir.getCanonicalPath - val e = intercept[ClassNotFoundException] { - spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path) - }.getMessage - assert(e.contains("Failed to find data source: hive")) - } - } - - test("saveAsTable API - format hive") { - val tableName = "tab1" - withTable(tableName) { - val e = intercept[AnalysisException] { - spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName) - }.getMessage - assert(e.contains("Cannot create hive serde table with saveAsTable API")) - } - } - test("create a temp view using hive") { val tableName = "tab1" withTable (tableName) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 40d593a1adec..77285282a6a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1352,4 +1352,18 @@ class HiveDDLSuite "CTAS for hive serde tables does not support append or overwrite semantics")) } } + + test("read/write files with hive data source is not allowed") { + withTempDir { dir => + val e = intercept[AnalysisException] { + spark.read.format("hive").load(dir.getAbsolutePath) + } + assert(e.message.contains("Hive data source can only be used with tables")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath) + } + assert(e2.message.contains("Hive data source can only be used with tables")) + } + } } From 9c48f934d2094310b0e763c6d14045eb4e19aa2a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Jan 2017 16:52:54 +0800 Subject: [PATCH 3/3] fix test --- .../scala/org/apache/spark/sql/internal/CatalogSuite.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 89ec162c8ed5..5dd04543ed14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -322,13 +322,6 @@ class CatalogSuite assert(e2.message == "Cannot create a file-based external data source table without path") } - test("createExternalTable should fail if provider is hive") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String]) - } - assert(e.message.contains("Cannot create hive serde table with createExternalTable API")) - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name")