Skip to content

Commit 22d6ac4

Browse files
gatorsmilecmonkey
authored andcommitted
[SPARK-19229][SQL] Disallow Creating Hive Source Tables when Hive Support is Not Enabled
### What changes were proposed in this pull request? It is weird to create Hive source tables when using InMemoryCatalog. We are unable to operate it. This PR is to block users to create Hive source tables. ### How was this patch tested? Fixed the test cases Author: gatorsmile <[email protected]> Closes apache#16587 from gatorsmile/blockHiveTable.
1 parent 6ee7b90 commit 22d6ac4

File tree

18 files changed

+72
-93
lines changed

18 files changed

+72
-93
lines changed

python/pyspark/sql/tests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,8 +1717,8 @@ def test_list_tables(self):
17171717
self.assertEquals(spark.catalog.listTables(), [])
17181718
self.assertEquals(spark.catalog.listTables("some_db"), [])
17191719
spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab")
1720-
spark.sql("CREATE TABLE tab1 (name STRING, age INT)")
1721-
spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT)")
1720+
spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
1721+
spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT) USING parquet")
17221722
tables = sorted(spark.catalog.listTables(), key=lambda t: t.name)
17231723
tablesDefault = sorted(spark.catalog.listTables("default"), key=lambda t: t.name)
17241724
tablesSomeDb = sorted(spark.catalog.listTables("some_db"), key=lambda t: t.name)
@@ -1796,8 +1796,8 @@ def test_list_columns(self):
17961796
spark = self.spark
17971797
spark.catalog._reset()
17981798
spark.sql("CREATE DATABASE some_db")
1799-
spark.sql("CREATE TABLE tab1 (name STRING, age INT)")
1800-
spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT)")
1799+
spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
1800+
spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT) USING parquet")
18011801
columns = sorted(spark.catalog.listColumns("tab1"), key=lambda c: c.name)
18021802
columnsDefault = sorted(spark.catalog.listColumns("tab1", "default"), key=lambda c: c.name)
18031803
self.assertEquals(columns, columnsDefault)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -370,22 +370,6 @@ trait CheckAnalysis extends PredicateHelper {
370370
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
371371
""".stripMargin)
372372

373-
case s: SimpleCatalogRelation =>
374-
failAnalysis(
375-
s"""
376-
|Hive support is required to select over the following tables:
377-
|${s.catalogTable.identifier}
378-
""".stripMargin)
379-
380-
// TODO: We need to consolidate this kind of checks for InsertIntoTable
381-
// with the rule of PreWriteCheck defined in extendedCheckRules.
382-
case InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) =>
383-
failAnalysis(
384-
s"""
385-
|Hive support is required to insert into the following tables:
386-
|${s.catalogTable.identifier}
387-
""".stripMargin)
388-
389373
case InsertIntoTable(t, _, _, _, _)
390374
if !t.isInstanceOf[LeafNode] ||
391375
t.isInstanceOf[Range] ||

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
156156

157157
test("the table type of an external table should be EXTERNAL_TABLE") {
158158
val catalog = newBasicCatalog()
159-
val table =
160-
newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
159+
val table = newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
161160
catalog.createTable(table, ignoreIfExists = false)
162161
val actual = catalog.getTable("db2", "external_table1")
163162
assert(actual.tableType === CatalogTableType.EXTERNAL)
@@ -278,7 +277,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
278277
schema = new StructType()
279278
.add("HelLo", "int", nullable = false)
280279
.add("WoRLd", "int", nullable = true),
281-
provider = Some("hive"),
280+
provider = Some(defaultProvider),
282281
partitionColumnNames = Seq("WoRLd"),
283282
bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil)))
284283
catalog.createTable(tbl, ignoreIfExists = false)
@@ -330,7 +329,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
330329
.add("col2", "string")
331330
.add("partCol1", "int")
332331
.add("partCol2", "string"),
333-
provider = Some("hive"),
332+
provider = Some(defaultProvider),
334333
partitionColumnNames = Seq("partCol1", "partCol2"))
335334
catalog.createTable(table, ignoreIfExists = false)
336335

@@ -357,7 +356,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
357356
.add("col2", "string")
358357
.add("partCol1", "int")
359358
.add("partCol2", "string"),
360-
provider = Some("hive"),
359+
provider = Some(defaultProvider),
361360
partitionColumnNames = Seq("partCol1", "partCol2"))
362361
catalog.createTable(table, ignoreIfExists = false)
363362

@@ -505,7 +504,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
505504
.add("col2", "string")
506505
.add("partCol1", "int")
507506
.add("partCol2", "string"),
508-
provider = Some("hive"),
507+
provider = Some(defaultProvider),
509508
partitionColumnNames = Seq("partCol1", "partCol2"))
510509
catalog.createTable(table, ignoreIfExists = false)
511510

@@ -726,7 +725,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
726725
tableType = CatalogTableType.MANAGED,
727726
storage = CatalogStorageFormat.empty,
728727
schema = new StructType().add("a", "int").add("b", "string"),
729-
provider = Some("hive")
728+
provider = Some(defaultProvider)
730729
)
731730

732731
catalog.createTable(table, ignoreIfExists = false)
@@ -746,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
746745
Some(Utils.createTempDir().getAbsolutePath),
747746
None, None, None, false, Map.empty),
748747
schema = new StructType().add("a", "int").add("b", "string"),
749-
provider = Some("hive")
748+
provider = Some(defaultProvider)
750749
)
751750
catalog.createTable(externalTable, ignoreIfExists = false)
752751
assert(!exists(db.locationUri, "external_table"))
@@ -763,7 +762,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
763762
.add("col2", "string")
764763
.add("partCol1", "int")
765764
.add("partCol2", "string"),
766-
provider = Some("hive"),
765+
provider = Some(defaultProvider),
767766
partitionColumnNames = Seq("partCol1", "partCol2"))
768767
catalog.createTable(table, ignoreIfExists = false)
769768

@@ -829,6 +828,7 @@ abstract class CatalogTestUtils {
829828
// Unimplemented methods
830829
val tableInputFormat: String
831830
val tableOutputFormat: String
831+
val defaultProvider: String
832832
def newEmptyCatalog(): ExternalCatalog
833833

834834
// These fields must be lazy because they rely on fields that are not implemented yet
@@ -901,7 +901,7 @@ abstract class CatalogTestUtils {
901901
.add("col2", "string")
902902
.add("a", "int")
903903
.add("b", "string"),
904-
provider = Some("hive"),
904+
provider = Some(defaultProvider),
905905
partitionColumnNames = Seq("a", "b"),
906906
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
907907
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class InMemoryCatalogSuite extends ExternalCatalogSuite {
2424
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
2525
override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
2626
override val tableOutputFormat: String = "org.apache.park.SequenceFileOutputFormat"
27+
override val defaultProvider: String = "parquet"
2728
override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
2829
}
2930

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class SessionCatalogSuite extends PlanTest {
3737
private val utils = new CatalogTestUtils {
3838
override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
3939
override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
40+
override val defaultProvider: String = "parquet"
4041
override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
4142
}
4243

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
6060
identifier = table.identifier.copy(
6161
database = Some(
6262
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
63-
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
63+
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions)
6464
val dataSource: BaseRelation =
6565
DataSource(
6666
sparkSession = sparkSession,
@@ -89,7 +89,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
8989
// partition provider hive, but no partitions in the metastore. The user has to call
9090
// `msck repair table` to populate the table partitions.
9191
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
92-
sparkSession.sessionState.conf.manageFilesourcePartitions)
92+
sessionState.conf.manageFilesourcePartitions)
9393
// We will return Nil or throw exception at the beginning if the table already exists, so when
9494
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
9595
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
@@ -163,8 +163,7 @@ case class CreateDataSourceTableAsSelectCommand(
163163
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
164164
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
165165
// Need to recover partitions into the metastore so our saved data is visible.
166-
sparkSession.sessionState.executePlan(
167-
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
166+
sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
168167
case _ =>
169168
}
170169
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
409409
plan.foreach {
410410
case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
411411
throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
412-
412+
case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
413+
throw new AnalysisException("Hive support is required to CREATE Hive TABLE")
413414
case _ => // OK
414415
}
415416
}

sql/core/src/test/resources/sql-tests/inputs/change-column.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Create the origin table
2-
CREATE TABLE test_change(a INT, b STRING, c INT);
2+
CREATE TABLE test_change(a INT, b STRING, c INT) using parquet;
33
DESC test_change;
44

55
-- Change column name (not supported yet)
@@ -47,7 +47,7 @@ CREATE GLOBAL TEMPORARY VIEW global_temp_view(a, b) AS SELECT 1, "one";
4747
ALTER TABLE global_temp.global_temp_view CHANGE a a INT COMMENT 'this is column a';
4848

4949
-- Change column in partition spec (not supported yet)
50-
CREATE TABLE partition_table(a INT, b STRING) PARTITIONED BY (c INT, d STRING);
50+
CREATE TABLE partition_table(a INT, b STRING, c INT, d STRING) USING parquet PARTITIONED BY (c, d);
5151
ALTER TABLE partition_table PARTITION (c = 1) CHANGE COLUMN a new_a INT;
5252

5353
-- DROP TEST TABLE

sql/core/src/test/resources/sql-tests/inputs/describe.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING);
1+
CREATE TABLE t (a STRING, b INT, c STRING, d STRING) USING parquet PARTITIONED BY (c, d);
22

33
ALTER TABLE t ADD PARTITION (c='Us', d=1);
44

sql/core/src/test/resources/sql-tests/inputs/show-tables.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
-- Test data.
22
CREATE DATABASE showdb;
33
USE showdb;
4-
CREATE TABLE show_t1(a String, b Int) PARTITIONED BY (c String, d String);
4+
CREATE TABLE show_t1(a String, b Int, c String, d String) USING parquet PARTITIONED BY (c, d);
55
ALTER TABLE show_t1 ADD PARTITION (c='Us', d=1);
6-
CREATE TABLE show_t2(b String, d Int);
6+
CREATE TABLE show_t2(b String, d Int) USING parquet;
77
CREATE TEMPORARY VIEW show_t3(e int) USING parquet;
88
CREATE GLOBAL TEMP VIEW show_t4 AS SELECT 1 as col1;
99

0 commit comments

Comments
 (0)