From 5a6790398098452bd34f8fd21de495178a87239e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 16 Feb 2015 16:21:54 -0800 Subject: [PATCH 1/3] Use data source write path for Hive's CTAS statements when no storage format/handler is specified. --- .../apache/spark/sql/hive/HiveContext.scala | 3 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 75 +++++++++++++++---- .../sql/hive/execution/SQLQuerySuite.scala | 68 ++++++++++++++++- 3 files changed, 130 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6c55bc6be17f9..c695b7bd5c6cf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,6 +61,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" + protected[sql] def convertHiveCTASWithoutStorageSpec: Boolean = + getConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "false") == "true" + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa8afdaf..44451fe743e30 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} -import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -499,24 +499,69 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(sa.getQB().getTableDesc) } - execution.CreateTableAsSelect( - databaseName, - tableName, - child, - allowExisting, - desc) + // Check if the query specifies file format or storage handler. + val hasStorageSpec = desc match { + case Some(crtTbl) => + crtTbl != null && (crtTbl.getSerName != null || crtTbl.getStorageHandler != null) + case None => false + } + + if (hive.convertHiveCTASWithoutStorageSpec && !hasStorageSpec) { + // Do the conversion when convertHiveCTASWithoutStorageSpec is true and the query + // does not specify any storage format (file format and storage handler). + if (dbName.isDefined) { + throw new AnalysisException( + "Cannot specify database name in a CTAS statement " + + "when spark.sql.sources.convertHiveCTASWithoutStorageSpec is set to true.") + } + + val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableUsingAsSelect( + tblName, + hive.conf.defaultDataSourceName, + temporary = false, + mode, + options = Map.empty[String, String], + child + ) + } else { + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + desc) + } case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - execution.CreateTableAsSelect( - databaseName, - tableName, - child, - allowExisting, - None) + if (hive.convertHiveCTASWithoutStorageSpec) { + if (dbName.isDefined) { + throw new AnalysisException( + "Cannot specify database name in a CTAS statement " + + "when spark.sql.sources.convertHiveCTASWithoutStorageSpec is set to true.") + } + + val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableUsingAsSelect( + tblName, + hive.conf.defaultDataSourceName, + temporary = false, + mode, + options = Map.empty[String, String], + child + ) + } else { + val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + None) + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e8d9eec3d88ff..4422bbdfd918a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} @@ -42,6 +45,69 @@ class SQLQuerySuite extends QueryTest { ) } + test("CTAS without serde") { + def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { + val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) + relation match { + case LogicalRelation(r: ParquetRelation2) => + if (!isDataSourceParquet) { + fail( + s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + + s"${ParquetRelation2.getClass.getCanonicalName}.") + } + + case r: MetastoreRelation => + if (isDataSourceParquet) { + fail( + s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + + s"${classOf[MetastoreRelation].getCanonicalName}.") + } + } + } + + val originalConf = getConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "false") + + setConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "true") + + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + // Specifying database name for query can be converted to data source write path + // is not allowed right now. + val message = intercept[AnalysisException] { + sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + }.getMessage + assert( + message.contains("Cannot specify database name in a CTAS statement"), + "When spark.sql.sources.convertHiveCTASWithoutStorageSpec is true, we should not allow " + + "database name specified.") + + sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + sql( + "CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", true) + sql("DROP TABLE ctas1") + + sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") + + sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") + + // Enable the following one after SPARK-5852 is fixed + // sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") + // checkRelation("ctas1", false) + // sql("DROP TABLE ctas1") + + setConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", originalConf) + } + test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( From 8af5b2ac388487c13c98ceef80ff1a8fb1a62217 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 12:20:24 -0800 Subject: [PATCH 2/3] Update conf key and unit test. --- .../org/apache/spark/sql/hive/HiveContext.scala | 16 ++++++++++++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 8 ++++---- .../spark/sql/hive/execution/commands.scala | 6 ++++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 ++++++++++----- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c695b7bd5c6cf..46307bd031189 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,8 +61,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" - protected[sql] def convertHiveCTASWithoutStorageSpec: Boolean = - getConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "false") == "true" + /** + * When true, a table created by a Hive CTAS statement (no USING clause) will be + * converted to a data source table, using the data source set by spark.sql.sources.default. + * The table in CTAS statement will be converted when it meets any of the following conditions: + * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or + * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * is either TextFile or SequenceFile. + * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe + * is specified (no ROW FORMAT SERDE clause). + * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format + * and no SerDe is specified (no ROW FORMAT SERDE clause). + */ + protected[sql] def convertCTAS: Boolean = + getConf("spark.sql.hive.convertCTAS", "false") == "true" override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 44451fe743e30..ed53fd482ad6f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -506,13 +506,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with case None => false } - if (hive.convertHiveCTASWithoutStorageSpec && !hasStorageSpec) { + if (hive.convertCTAS && !hasStorageSpec) { // Do the conversion when convertHiveCTASWithoutStorageSpec is true and the query // does not specify any storage format (file format and storage handler). if (dbName.isDefined) { throw new AnalysisException( "Cannot specify database name in a CTAS statement " + - "when spark.sql.sources.convertHiveCTASWithoutStorageSpec is set to true.") + "when spark.sql.hive.convertCTAS is set to true.") } val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists @@ -537,11 +537,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - if (hive.convertHiveCTASWithoutStorageSpec) { + if (hive.convertCTAS) { if (dbName.isDefined) { throw new AnalysisException( "Cannot specify database name in a CTAS statement " + - "when spark.sql.sources.convertHiveCTASWithoutStorageSpec is set to true.") + "when spark.sql.hive.convertCTAS is set to true.") } val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 0aa5f7f7b88bd..2883207056704 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -165,8 +165,10 @@ case class CreateMetastoreDataSourceAsSelect( mode match { case SaveMode.ErrorIfExists => sys.error(s"Table $tableName already exists. " + - s"If you want to append into it, please set mode to SaveMode.Append. " + - s"Or, if you want to overwrite it, please set mode to SaveMode.Overwrite.") + s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to" + + s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") case SaveMode.Ignore => // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty[Row] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 4422bbdfd918a..be19070065b89 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -65,22 +65,27 @@ class SQLQuerySuite extends QueryTest { } } - val originalConf = getConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "false") + val originalConf = getConf("spark.sql.hive.convertCTAS", "false") - setConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", "true") + setConf("spark.sql.hive.convertCTAS", "true") sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + var message = intercept[RuntimeException] { + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") + }.getMessage + assert(message.contains("Table ctas1 already exists")) checkRelation("ctas1", true) sql("DROP TABLE ctas1") // Specifying database name for query can be converted to data source write path // is not allowed right now. - val message = intercept[AnalysisException] { + message = intercept[AnalysisException] { sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value") }.getMessage assert( message.contains("Cannot specify database name in a CTAS statement"), - "When spark.sql.sources.convertHiveCTASWithoutStorageSpec is true, we should not allow " + + "When spark.sql.hive.convertCTAS is true, we should not allow " + "database name specified.") sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src ORDER BY k, value") @@ -105,7 +110,7 @@ class SQLQuerySuite extends QueryTest { // checkRelation("ctas1", false) // sql("DROP TABLE ctas1") - setConf("spark.sql.sources.convertHiveCTASWithoutStorageSpec", originalConf) + setConf("spark.sql.hive.convertCTAS", originalConf) } test("CTAS with serde") { From ad2b07d4d39e9c0476889a04693a776eaab014a3 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 16:47:30 -0800 Subject: [PATCH 3/3] Update tests and error messages. --- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 13 +++++++------ .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 6 +++--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 ++++----- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 46307bd031189..d3365b1e8f44c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -74,7 +74,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * and no SerDe is specified (no ROW FORMAT SERDE clause). */ protected[sql] def convertCTAS: Boolean = - getConf("spark.sql.hive.convertCTAS", "false") == "true" + getConf("spark.sql.hive.convertCTAS", "false").toBoolean override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ed53fd482ad6f..e15e39ff71939 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -507,7 +507,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } if (hive.convertCTAS && !hasStorageSpec) { - // Do the conversion when convertHiveCTASWithoutStorageSpec is true and the query + // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). if (dbName.isDefined) { throw new AnalysisException( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 2883207056704..4c2c21deea100 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.sources._ @@ -117,7 +118,7 @@ case class CreateMetastoreDataSource( if (allowExisting) { return Seq.empty[Row] } else { - sys.error(s"Table $tableName already exists.") + throw new AnalysisException(s"Table $tableName already exists.") } } @@ -164,8 +165,8 @@ case class CreateMetastoreDataSourceAsSelect( // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => - sys.error(s"Table $tableName already exists. " + - s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to" + + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + s"the existing data. " + s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") @@ -193,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect( s"== Actual Schema ==" +: createdRelation.schema.treeString.split("\\\n")).mkString("\n")} """.stripMargin - sys.error(errorMessage) + throw new AnalysisException(errorMessage) } else if (i != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + @@ -210,10 +211,10 @@ case class CreateMetastoreDataSourceAsSelect( s"== Actual Relation ==" :: createdRelation.toString :: Nil).mkString("\n")} """.stripMargin - sys.error(errorMessage) + throw new AnalysisException(errorMessage) } case o => - sys.error(s"Saving data in ${o.toString} is not supported.") + throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => hiveContext.sql(s"DROP TABLE IF EXISTS $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0263e3bb56617..ef895f14eb35d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -303,8 +303,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |SELECT * FROM jsonTable """.stripMargin) - // Create the table again should trigger a AlreadyExistsException. - val message = intercept[RuntimeException] { + // Create the table again should trigger a AnalysisException. + val message = intercept[AnalysisException] { sql( s""" |CREATE TABLE ctasJsonTable @@ -513,7 +513,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM createdJsonTable"), df.collect()) - var message = intercept[RuntimeException] { + var message = intercept[AnalysisException] { createExternalTable("createdJsonTable", filePath.toString) }.getMessage assert(message.contains("Table createdJsonTable already exists."), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index be19070065b89..d161092a62556 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -71,7 +71,7 @@ class SQLQuerySuite extends QueryTest { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value") - var message = intercept[RuntimeException] { + var message = intercept[AnalysisException] { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") }.getMessage assert(message.contains("Table ctas1 already exists")) @@ -105,10 +105,9 @@ class SQLQuerySuite extends QueryTest { checkRelation("ctas1", false) sql("DROP TABLE ctas1") - // Enable the following one after SPARK-5852 is fixed - // sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") - // checkRelation("ctas1", false) - // sql("DROP TABLE ctas1") + sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") + checkRelation("ctas1", false) + sql("DROP TABLE ctas1") setConf("spark.sql.hive.convertCTAS", originalConf) }