Skip to content

Commit d83dcc9

Browse files
committed
[SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a data source table in a hive compatible way
https://issues.apache.org/jira/browse/SPARK-10672 With changes in this PR, we will fallback to same the metadata of a table in Spark SQL specific way if we fail to save it in a hive compatible way (Hive throws an exception because of its internal restrictions, e.g. binary and decimal types cannot be saved to parquet if the metastore is running Hive 0.13). I manually tested the fix with the following test in `DataSourceWithHiveMetastoreCatalogSuite` (`spark.sql.hive.metastore.version=0.13` and `spark.sql.hive.metastore.jars`=`maven`). ``` test(s"fail to save metadata of a parquet table in hive 0.13") { withTempPath { dir => withTable("t") { val path = dir.getCanonicalPath sql( s"""CREATE TABLE t USING $provider |OPTIONS (path '$path') |AS SELECT 1 AS d1, cast("val_1" as binary) AS d2 """.stripMargin) sql( s"""describe formatted t """.stripMargin).collect.foreach(println) sqlContext.table("t").show } } } } ``` Without this fix, we will fail with the following error. ``` org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Unknown field type: binary at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:619) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply$mcV$sp(ClientWrapper.scala:359) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:256) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:211) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:248) at org.apache.spark.sql.hive.client.ClientWrapper.createTable(ClientWrapper.scala:357) at org.apache.spark.sql.hive.HiveMetastoreCatalog.createDataSourceTable(HiveMetastoreCatalog.scala:358) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:285) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:58) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:58) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:129) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:165) at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:150) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:52) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:162) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:125) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTempPath(HiveMetastoreCatalogSuite.scala:52) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.org$scalatest$BeforeAndAfterAll$$super$run(HiveMetastoreCatalogSuite.scala:52) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.run(HiveMetastoreCatalogSuite.scala:52) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671) at sbt.ForkMain$Run$2.call(ForkMain.java:294) at sbt.ForkMain$Run$2.call(ForkMain.java:284) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException: Unknown field type: binary at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:108) at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.<init>(ArrayWritableObjectInspector.java:60) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:339) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288) at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:194) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:597) ... 76 more ``` Author: Yin Huai <[email protected]> Closes #8824 from yhuai/datasourceMetadata. (cherry picked from commit 2204cdb) Signed-off-by: Yin Huai <[email protected]>
1 parent 54334d3 commit d83dcc9

File tree

1 file changed

+50
-51
lines changed

1 file changed

+50
-51
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
309309
}
310310

311311
// TODO: Support persisting partitioned data source relations in Hive compatible format
312-
val hiveTable = (maybeSerDe, dataSource.relation) match {
312+
val qualifiedTableName = tableIdent.quotedString
313+
val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
313314
case (Some(serde), relation: HadoopFsRelation)
314-
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
315-
// Hive ParquetSerDe doesn't support decimal type until 1.2.0.
316-
val isParquetSerDe = serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
317-
val hasDecimalFields = relation.schema.existsRecursively(_.isInstanceOf[DecimalType])
318-
319-
val hiveParquetSupportsDecimal = client.version match {
320-
case org.apache.spark.sql.hive.client.hive.v1_2 => true
321-
case _ => false
322-
}
323-
324-
if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields) {
325-
// If Hive version is below 1.2.0, we cannot save Hive compatible schema to
326-
// metastore when the file format is Parquet and the schema has DecimalType.
327-
logWarning {
328-
"Persisting Parquet relation with decimal field(s) into Hive metastore in Spark SQL " +
329-
"specific format, which is NOT compatible with Hive. Because ParquetHiveSerDe in " +
330-
s"Hive ${client.version.fullVersion} doesn't support decimal type. See HIVE-6384."
331-
}
332-
newSparkSQLSpecificMetastoreTable()
333-
} else {
334-
logInfo {
335-
"Persisting data source relation with a single input path into Hive metastore in " +
336-
s"Hive compatible format. Input path: ${relation.paths.head}"
337-
}
338-
newHiveCompatibleMetastoreTable(relation, serde)
339-
}
315+
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
316+
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
317+
val message =
318+
s"Persisting data source relation $qualifiedTableName with a single input path " +
319+
s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
320+
(Some(hiveTable), message)
340321

341322
case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
342-
logWarning {
343-
"Persisting partitioned data source relation into Hive metastore in " +
344-
s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
345-
relation.paths.mkString("\n", "\n", "")
346-
}
347-
newSparkSQLSpecificMetastoreTable()
323+
val message =
324+
s"Persisting partitioned data source relation $qualifiedTableName into " +
325+
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
326+
"Input path(s): " + relation.paths.mkString("\n", "\n", "")
327+
(None, message)
348328

349329
case (Some(serde), relation: HadoopFsRelation) =>
350-
logWarning {
351-
"Persisting data source relation with multiple input paths into Hive metastore in " +
352-
s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
353-
relation.paths.mkString("\n", "\n", "")
354-
}
355-
newSparkSQLSpecificMetastoreTable()
330+
val message =
331+
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
332+
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
333+
s"Input paths: " + relation.paths.mkString("\n", "\n", "")
334+
(None, message)
356335

357336
case (Some(serde), _) =>
358-
logWarning {
359-
s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
360-
"Persisting it into Hive metastore in Spark SQL specific format, " +
361-
"which is NOT compatible with Hive."
362-
}
363-
newSparkSQLSpecificMetastoreTable()
337+
val message =
338+
s"Data source relation $qualifiedTableName is not a " +
339+
s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
340+
"in Spark SQL specific format, which is NOT compatible with Hive."
341+
(None, message)
364342

365343
case _ =>
366-
logWarning {
344+
val message =
367345
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
368-
"Persisting data source relation into Hive metastore in Spark SQL specific format, " +
369-
"which is NOT compatible with Hive."
370-
}
371-
newSparkSQLSpecificMetastoreTable()
346+
s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
347+
s"Spark SQL specific format, which is NOT compatible with Hive."
348+
(None, message)
372349
}
373350

374-
client.createTable(hiveTable)
351+
(hiveCompitiableTable, logMessage) match {
352+
case (Some(table), message) =>
353+
// We first try to save the metadata of the table in a Hive compatiable way.
354+
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
355+
// specific way.
356+
try {
357+
logInfo(message)
358+
client.createTable(table)
359+
} catch {
360+
case throwable: Throwable =>
361+
val warningMessage =
362+
s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
363+
s"it into Hive metastore in Spark SQL specific format."
364+
logWarning(warningMessage, throwable)
365+
val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
366+
client.createTable(sparkSqlSpecificTable)
367+
}
368+
369+
case (None, message) =>
370+
logWarning(message)
371+
val hiveTable = newSparkSQLSpecificMetastoreTable()
372+
client.createTable(hiveTable)
373+
}
375374
}
376375

377376
def hiveDefaultTableFilePath(tableName: String): String = {

0 commit comments

Comments
 (0)