From 4b0b7b4f12a5e96dfaf272ab00a93a8b10590fb0 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 29 Sep 2016 10:25:11 -0700 Subject: [PATCH 1/2] Enable creating hive bucketed tables --- .../spark/sql/execution/SparkSqlParser.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/MetastoreRelation.scala | 18 ++++- .../sql/hive/client/HiveClientImpl.scala | 65 +++++++++++++++---- .../hive/execution/InsertIntoHiveTable.scala | 27 +++++++- .../spark/sql/hive/HiveDDLCommandSuite.scala | 33 ++++++++-- .../sql/hive/InsertIntoHiveTableSuite.scala | 47 ++++++++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 19 ++++++ 8 files changed, 191 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 5359cedc8097..f751798337c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -965,14 +965,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.skewSpec != null) { operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) } - if (ctx.bucketSpec != null) { - operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) - } + val comment = Option(ctx.STRING).map(string) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) + val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly @@ -1025,6 +1024,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { tableType = tableType, storage = storage, schema = schema, + bucketSpec = bucketSpec, provider = Some("hive"), partitionColumnNames = partitionCols.map(_.name), properties = properties, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8410a2e4a47c..1f888d663e21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -195,7 +195,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. + val bucketSpec = metastoreRelation.catalogTable.bucketSpec val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 33f0ecff6352..940c11ea098c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -25,8 +25,9 @@ import com.google.common.base.Objects import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order} import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession @@ -93,6 +94,21 @@ private[hive] case class MetastoreRelation( sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) + catalogTable.bucketSpec match { + case Some(bucketSpec) => + sd.setNumBuckets(bucketSpec.numBuckets) + sd.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) + + if (bucketSpec.sortColumnNames.nonEmpty) { + sd.setSortCols( + bucketSpec.sortColumnNames + .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) + .asJava + ) + } + case _ => + } + catalogTable.storage.locationUri.foreach(sd.setLocation) catalogTable.storage.inputFormat.foreach(sd.setInputFormat) catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index dd33d750a4d4..e5f1f9565d4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.security.UserGroupInformation @@ -356,12 +357,43 @@ private[hive] class HiveClientImpl( tableName: String): Option[CatalogTable] = withHiveState { logDebug(s"Looking up $dbName.$tableName") Option(client.getTable(dbName, tableName, false)).map { h => + val cols = h.getCols.asScala.map(fromHiveColumn) + val partCols = h.getPartCols.asScala.map(fromHiveColumn) + // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema - val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols) + val schema = StructType(cols ++ partCols) + + val bucketSpec = if (h.getNumBuckets > 0) { + val sortColumnOrders = h.getSortCols.asScala + // Currently Spark only supports columns to be sorted in ascending order + // but Hive can support both ascending and descending order. If all the columns + // are sorted in ascending order, only then propagate the sortedness information + // to downstream processing / optimizations in Spark + // TODO: In future we can have Spark support columns sorted in descending order + val allAscendingSorted = + sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) + + val sortColumnNames = if (allAscendingSorted) { + sortColumnOrders.map { sortOrder => + val columnName = sortOrder.getCol + + if (!cols.exists(_.name.equalsIgnoreCase(columnName))) { + throw new AnalysisException(s"No match found for sort column name = $columnName " + + s"in table $dbName.$tableName. " + + s"Known table columns are ${cols.mkString("[", ", ", "]")}") + } + columnName + } + } else { + Seq() + } + Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) + } else { + None + } - // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) + // Skew spec and storage handler can't be mapped to CatalogTable (yet) val unsupportedFeatures = ArrayBuffer.empty[String] if (!h.getSkewedColNames.isEmpty) { @@ -372,10 +404,6 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "storage handler" } - if (!h.getBucketCols.isEmpty) { - unsupportedFeatures += "bucketing" - } - val properties = Option(h.getParameters).map(_.asScala.toMap).orNull CatalogTable( @@ -389,9 +417,7 @@ private[hive] class HiveClientImpl( }, schema = schema, partitionColumnNames = partCols.map(_.name), - // We can not populate bucketing information for Hive tables as Spark SQL has a different - // implementation of hash function from Hive. - bucketSpec = None, + bucketSpec = bucketSpec, owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, @@ -798,6 +824,23 @@ private[hive] class HiveClientImpl( table.comment.foreach { c => hiveTable.setProperty("comment", c) } table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } table.viewText.foreach { t => hiveTable.setViewExpandedText(t) } + + table.bucketSpec match { + case Some(bucketSpec) => + hiveTable.setNumBuckets(bucketSpec.numBuckets) + hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava) + + if (bucketSpec.sortColumnNames.nonEmpty) { + hiveTable.setSortCols( + bucketSpec.sortColumnNames + .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) + .toList + .asJava + ) + } + case _ => + } + hiveTable } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 53bb3b93db73..758441e5db82 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -20,11 +20,8 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import java.net.URI import java.text.SimpleDateFormat -import java.util import java.util.{Date, Random} -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils @@ -198,6 +195,30 @@ case class InsertIntoHiveTable( } } + table.catalogTable.bucketSpec match { + case Some(bucketSpec) => + // We can not populate bucketing information for Hive tables as Spark SQL has a different + // implementation of hash function from Hive. + // Hive native hashing will be supported after SPARK-17495. Until then, writes to bucketed + // tables are allowed only if user does not care about maintaining table's bucketing + // ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are set to false + + val enforceBucketingConfig = "hive.enforce.bucketing" + val enforceSortingConfig = "hive.enforce.sorting" + + val message = s"Output Hive table ${table.catalogTable.identifier} is bucketed but Spark" + + "currently does NOT populate bucketed output which is compatible with Hive." + + if (hadoopConf.get(enforceBucketingConfig, "false").toBoolean || + hadoopConf.get(enforceSortingConfig, "false").toBoolean) { + throw new AnalysisException(message) + } else { + logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + + s"$enforceSortingConfig are set to false.") + } + case _ => // do nothing since table has no bucketing + } + val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 54e27b6f7350..3fdbc5ad49a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -352,13 +352,32 @@ class HiveDDLCommandSuite extends PlanTest { } test("create table - clustered by") { - val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)" - val query1 = s"$baseQuery INTO 10 BUCKETS" - val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS" - val e1 = intercept[ParseException] { parser.parsePlan(query1) } - val e2 = intercept[ParseException] { parser.parsePlan(query2) } - assert(e1.getMessage.contains("Operation not allowed")) - assert(e2.getMessage.contains("Operation not allowed")) + val numBuckets = 10 + val bucketedColumn = "id" + val sortColumn = "id" + val baseQuery = + s""" + CREATE TABLE my_table ( + $bucketedColumn int, + name string) + CLUSTERED BY($bucketedColumn) + """ + + val query1 = s"$baseQuery INTO $numBuckets BUCKETS" + val (desc1, _) = extractTableDesc(query1) + assert(desc1.bucketSpec.isDefined) + val bucketSpec1 = desc1.bucketSpec.get + assert(bucketSpec1.numBuckets == numBuckets) + assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec1.sortColumnNames.isEmpty) + + val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS" + val (desc2, _) = extractTableDesc(query2) + assert(desc2.bucketSpec.isDefined) + val bucketSpec2 = desc2.bucketSpec.get + assert(bucketSpec2.numBuckets == numBuckets) + assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec2.sortColumnNames.head.equals(sortColumn)) } test("create table - skewed by") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d9ce1c3dc18f..d40641750dfa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -517,4 +517,51 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef spark.table("t").write.insertInto(tableName) } } + + private def testBucketedTable(testName: String)(f: String => Unit): Unit = { + test(s"Hive SerDe table - $testName") { + val hiveTable = "hive_table" + + withTable(hiveTable) { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql( + s""" + |CREATE TABLE $hiveTable (a INT, d INT) + |PARTITIONED BY (b INT, c INT) + |CLUSTERED BY(a) + |SORTED BY(a, d) INTO 256 BUCKETS + |STORED AS TEXTFILE + """.stripMargin) + f(hiveTable) + } + } + } + } + + testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") { + tableName => + withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b") + checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4)) + } + } + + testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") { + tableName => + withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") { + intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 751e976c7b90..f5ba08c95a0b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -506,6 +506,25 @@ class HiveDDLSuite } } + test("desc table for Hive table - bucketed + sorted table") { + withTable("tbl") { + sql(s""" + CREATE TABLE tbl (id int, name string) + PARTITIONED BY (ds string) + CLUSTERED BY(id) + SORTED BY(id, name) INTO 1024 BUCKETS + """) + + assert(sql("DESC FORMATTED tbl").collect().containsSlice( + Seq( + Row("Num Buckets", "8", null), + Row("Bucket Columns", "id1", null), + Row("Sort Columns", "id, name", null) + ) + )) + } + } + test("desc formatted table for permanent view") { withTable("tbl") { withView("view1") { From 136933219c98bd5e23170ceddae618a74d4d796d Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 29 Sep 2016 17:53:57 -0700 Subject: [PATCH 2/2] fix test case failures --- .../apache/spark/sql/execution/command/tables.scala | 11 ++++++++--- .../apache/spark/sql/hive/ShowCreateTableSuite.scala | 11 +++-------- .../spark/sql/hive/execution/HiveDDLSuite.scala | 6 +++--- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6a91c997bac6..fd8f3eec7be6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -767,9 +767,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } - if (metadata.bucketSpec.isDefined) { - throw new UnsupportedOperationException( - "Creating Hive table with bucket spec is not supported yet.") + if (metadata.bucketSpec.nonEmpty) { + val bucketSpec = metadata.bucketSpec.get + builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n" + + if (bucketSpec.sortColumnNames.nonEmpty) { + builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n" + } + builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n" } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index e925921165d6..056f4caffcf5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing } } - test("hive bucketing is not supported") { + test("hive bucketing is supported") { withTable("t1") { - createRawHiveTable( + sql( s"""CREATE TABLE t1 (a INT, b STRING) |CLUSTERED BY (a) |SORTED BY (b) |INTO 2 BUCKETS """.stripMargin ) - - val cause = intercept[AnalysisException] { - sql("SHOW CREATE TABLE t1") - } - - assert(cause.getMessage.contains(" - bucketing")) + checkCreateTable("t1") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f5ba08c95a0b..86c3f90c05aa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -517,9 +517,9 @@ class HiveDDLSuite assert(sql("DESC FORMATTED tbl").collect().containsSlice( Seq( - Row("Num Buckets", "8", null), - Row("Bucket Columns", "id1", null), - Row("Sort Columns", "id, name", null) + Row("Num Buckets:", "1024", ""), + Row("Bucket Columns:", "[id]", ""), + Row("Sort Columns:", "[id, name]", "") ) )) }