Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -965,14 +965,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.skewSpec != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}

val comment = Option(ctx.STRING).map(string)
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
Expand Down Expand Up @@ -1025,6 +1024,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
tableType = tableType,
storage = storage,
schema = schema,
bucketSpec = bucketSpec,
provider = Some("hive"),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,9 +767,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}

if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
if (metadata.bucketSpec.nonEmpty) {
val bucketSpec = metadata.bucketSpec.get
builder ++= s"CLUSTERED BY (${bucketSpec.bucketColumnNames.mkString(",")})\n"

if (bucketSpec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY (${bucketSpec.sortColumnNames.map(_ + " ASC").mkString(", ")})\n"
}
builder ++= s"INTO ${bucketSpec.numBuckets} BUCKETS\n"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
val bucketSpec = metastoreRelation.catalogTable.bucketSpec

val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import com.google.common.base.Objects
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order}
import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -93,6 +94,21 @@ private[hive] case class MetastoreRelation(
sd.setCols(schema.asJava)
tTable.setPartitionKeys(partCols.asJava)

catalogTable.bucketSpec match {
case Some(bucketSpec) =>
sd.setNumBuckets(bucketSpec.numBuckets)
sd.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)

if (bucketSpec.sortColumnNames.nonEmpty) {
sd.setSortCols(
bucketSpec.sortColumnNames
.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.asJava
)
}
case _ =>
}

catalogTable.storage.locationUri.foreach(sd.setLocation)
catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -356,12 +357,43 @@ private[hive] class HiveClientImpl(
tableName: String): Option[CatalogTable] = withHiveState {
logDebug(s"Looking up $dbName.$tableName")
Option(client.getTable(dbName, tableName, false)).map { h =>
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)

// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
val schema = StructType(cols ++ partCols)

val bucketSpec = if (h.getNumBuckets > 0) {
val sortColumnOrders = h.getSortCols.asScala
// Currently Spark only supports columns to be sorted in ascending order
// but Hive can support both ascending and descending order. If all the columns
// are sorted in ascending order, only then propagate the sortedness information
// to downstream processing / optimizations in Spark
// TODO: In future we can have Spark support columns sorted in descending order
val allAscendingSorted =
sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)

val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map { sortOrder =>
val columnName = sortOrder.getCol

if (!cols.exists(_.name.equalsIgnoreCase(columnName))) {
throw new AnalysisException(s"No match found for sort column name = $columnName " +
s"in table $dbName.$tableName. " +
s"Known table columns are ${cols.mkString("[", ", ", "]")}")
}
columnName
}
} else {
Seq()
}
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
} else {
None
}

// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
// Skew spec and storage handler can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]

if (!h.getSkewedColNames.isEmpty) {
Expand All @@ -372,10 +404,6 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "storage handler"
}

if (!h.getBucketCols.isEmpty) {
unsupportedFeatures += "bucketing"
}

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull

CatalogTable(
Expand All @@ -389,9 +417,7 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
// We can not populate bucketing information for Hive tables as Spark SQL has a different
// implementation of hash function from Hive.
bucketSpec = None,
bucketSpec = bucketSpec,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
Expand Down Expand Up @@ -798,6 +824,23 @@ private[hive] class HiveClientImpl(
table.comment.foreach { c => hiveTable.setProperty("comment", c) }
table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }

table.bucketSpec match {
case Some(bucketSpec) =>
hiveTable.setNumBuckets(bucketSpec.numBuckets)
hiveTable.setBucketCols(bucketSpec.bucketColumnNames.toList.asJava)

if (bucketSpec.sortColumnNames.nonEmpty) {
hiveTable.setSortCols(
bucketSpec.sortColumnNames
.map(col => new Order(col, HIVE_COLUMN_ORDER_ASC))
.toList
.asJava
)
}
case _ =>
}

hiveTable
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ package org.apache.spark.sql.hive.execution
import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
Expand Down Expand Up @@ -198,6 +195,30 @@ case class InsertIntoHiveTable(
}
}

table.catalogTable.bucketSpec match {
case Some(bucketSpec) =>
// We can not populate bucketing information for Hive tables as Spark SQL has a different
// implementation of hash function from Hive.
// Hive native hashing will be supported after SPARK-17495. Until then, writes to bucketed
// tables are allowed only if user does not care about maintaining table's bucketing
// ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are set to false

val enforceBucketingConfig = "hive.enforce.bucketing"
val enforceSortingConfig = "hive.enforce.sorting"

val message = s"Output Hive table ${table.catalogTable.identifier} is bucketed but Spark" +
"currently does NOT populate bucketed output which is compatible with Hive."

if (hadoopConf.get(enforceBucketingConfig, "false").toBoolean ||
hadoopConf.get(enforceSortingConfig, "false").toBoolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the default values (false) for these two configs safe? If user doesn't aware of it, it could insert non compatible data into bucketed Hive table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya : Even right now on trunk if you try to insert data into a bucketed table, it will just work w/o producing bucketed output. I don't want to break that for existing users by making these true. The eventual goal would be to not have these configs and Spark should always produce data adhering to the tables' bucketing spec (without breaking existing pipelines).

throw new AnalysisException(message)
} else {
logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
s"$enforceSortingConfig are set to false.")
}
case _ => // do nothing since table has no bucketing
}

val jobConf = new JobConf(hadoopConf)
val jobConfSer = new SerializableJobConf(jobConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,32 @@ class HiveDDLCommandSuite extends PlanTest {
}

test("create table - clustered by") {
val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
val query1 = s"$baseQuery INTO 10 BUCKETS"
val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
assert(e1.getMessage.contains("Operation not allowed"))
assert(e2.getMessage.contains("Operation not allowed"))
val numBuckets = 10
val bucketedColumn = "id"
val sortColumn = "id"
val baseQuery =
s"""
CREATE TABLE my_table (
$bucketedColumn int,
name string)
CLUSTERED BY($bucketedColumn)
"""

val query1 = s"$baseQuery INTO $numBuckets BUCKETS"
val (desc1, _) = extractTableDesc(query1)
assert(desc1.bucketSpec.isDefined)
val bucketSpec1 = desc1.bucketSpec.get
assert(bucketSpec1.numBuckets == numBuckets)
assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec1.sortColumnNames.isEmpty)

val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS"
val (desc2, _) = extractTableDesc(query2)
assert(desc2.bucketSpec.isDefined)
val bucketSpec2 = desc2.bucketSpec.get
assert(bucketSpec2.numBuckets == numBuckets)
assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn))
assert(bucketSpec2.sortColumnNames.head.equals(sortColumn))
}

test("create table - skewed by") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,51 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
spark.table("t").write.insertInto(tableName)
}
}

private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
test(s"Hive SerDe table - $testName") {
val hiveTable = "hive_table"

withTable(hiveTable) {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(
s"""
|CREATE TABLE $hiveTable (a INT, d INT)
|PARTITIONED BY (b INT, c INT)
|CLUSTERED BY(a)
|SORTED BY(a, d) INTO 256 BUCKETS
|STORED AS TEXTFILE
""".stripMargin)
f(hiveTable)
}
}
}
}

testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
}
}

testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
tableName =>
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,16 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}
}

test("hive bucketing is not supported") {
test("hive bucketing is supported") {
withTable("t1") {
createRawHiveTable(
sql(
s"""CREATE TABLE t1 (a INT, b STRING)
|CLUSTERED BY (a)
|SORTED BY (b)
|INTO 2 BUCKETS
""".stripMargin
)

val cause = intercept[AnalysisException] {
sql("SHOW CREATE TABLE t1")
}

assert(cause.getMessage.contains(" - bucketing"))
checkCreateTable("t1")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ class HiveDDLSuite
}
}

test("desc table for Hive table - bucketed + sorted table") {
withTable("tbl") {
sql(s"""
CREATE TABLE tbl (id int, name string)
PARTITIONED BY (ds string)
CLUSTERED BY(id)
SORTED BY(id, name) INTO 1024 BUCKETS
""")

assert(sql("DESC FORMATTED tbl").collect().containsSlice(
Seq(
Row("Num Buckets:", "1024", ""),
Row("Bucket Columns:", "[id]", ""),
Row("Sort Columns:", "[id, name]", "")
)
))
}
}

test("desc formatted table for permanent view") {
withTable("tbl") {
withView("view1") {
Expand Down