diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d7eb14356b8b1..58868999cc193 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -300,6 +300,8 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, @@ -313,6 +315,10 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.CASE_SENSITIVE.key, sparkSession.sessionState.conf.caseSensitiveAnalysis) + hadoopConf.set( + ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE, + parquetOptions.duplicatedFieldsResolutionMode + ) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 9cfc30725f03a..df3281a258a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -69,12 +69,25 @@ class ParquetOptions( .get(MERGE_SCHEMA) .map(_.toBoolean) .getOrElse(sqlConf.isParquetSchemaMergingEnabled) + + /** + * How to resolve duplicated field names. By default, parquet data source fails when hitting + * duplicated field names in case-insensitive mode. When converting hive parquet table to parquet + * data source, we need to ask parquet data source to pick the first matched field - the same + * behavior as hive parquet table - to keep behaviors consistent. + */ + val duplicatedFieldsResolutionMode: String = { + parameters.getOrElse(DUPLICATED_FIELDS_RESOLUTION_MODE, + ParquetDuplicatedFieldsResolutionMode.FAIL.toString) + } } object ParquetOptions { val MERGE_SCHEMA = "mergeSchema" + val DUPLICATED_FIELDS_RESOLUTION_MODE = "duplicatedFieldsResolutionMode" + // The parquet compression short names private val shortParquetCompressionCodecNames = Map( "none" -> CompressionCodecName.UNCOMPRESSED, @@ -90,3 +103,7 @@ object ParquetOptions { shortParquetCompressionCodecNames(name).name() } } + +object ParquetDuplicatedFieldsResolutionMode extends Enumeration { + val FAIL, FIRST_MATCH = Value +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 3319e73f2b313..a3fe8fe67f8cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -65,17 +65,16 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) * readers. Responsible for figuring out Parquet requested schema used for column pruning. */ override def init(context: InitContext): ReadContext = { + val conf = context.getConfiguration + catalystRequestedSchema = { - val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } - val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, - SQLConf.CASE_SENSITIVE.defaultValue.get) val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( - context.getFileSchema, catalystRequestedSchema, caseSensitive) + context.getFileSchema, catalystRequestedSchema, conf) new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -123,9 +122,9 @@ private[parquet] object ParquetReadSupport { def clipParquetSchema( parquetSchema: MessageType, catalystSchema: StructType, - caseSensitive: Boolean = true): MessageType = { + conf: Configuration): MessageType = { val clippedParquetFields = clipParquetGroupFields( - parquetSchema.asGroupType(), catalystSchema, caseSensitive) + parquetSchema.asGroupType(), catalystSchema, conf) if (clippedParquetFields.isEmpty) { ParquetSchemaConverter.EMPTY_MESSAGE } else { @@ -137,20 +136,20 @@ private[parquet] object ParquetReadSupport { } private def clipParquetType( - parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = { + parquetType: Type, catalystType: DataType, conf: Configuration): Type = { catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => // Only clips array types with nested type as element type. - clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive) + clipParquetListType(parquetType.asGroupType(), t.elementType, conf) case t: MapType if !isPrimitiveCatalystType(t.keyType) || !isPrimitiveCatalystType(t.valueType) => // Only clips map types with nested key type or value type - clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive) + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, conf) case t: StructType => - clipParquetGroup(parquetType.asGroupType(), t, caseSensitive) + clipParquetGroup(parquetType.asGroupType(), t, conf) case _ => // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able @@ -177,14 +176,14 @@ private[parquet] object ParquetReadSupport { * [[StructType]]. */ private def clipParquetListType( - parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = { + parquetList: GroupType, elementType: DataType, conf: Configuration): Type = { // Precondition of this method, should only be called for lists with nested element types. assert(!isPrimitiveCatalystType(elementType)) // Unannotated repeated group should be interpreted as required list of required element, so // list element type is just the group itself. Clip it. if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { - clipParquetType(parquetList, elementType, caseSensitive) + clipParquetType(parquetList, elementType, conf) } else { assert( parquetList.getOriginalType == OriginalType.LIST, @@ -216,7 +215,7 @@ private[parquet] object ParquetReadSupport { Types .buildGroup(parquetList.getRepetition) .as(OriginalType.LIST) - .addField(clipParquetType(repeatedGroup, elementType, caseSensitive)) + .addField(clipParquetType(repeatedGroup, elementType, conf)) .named(parquetList.getName) } else { // Otherwise, the repeated field's type is the element type with the repeated field's @@ -227,7 +226,7 @@ private[parquet] object ParquetReadSupport { .addField( Types .repeatedGroup() - .addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive)) + .addField(clipParquetType(repeatedGroup.getType(0), elementType, conf)) .named(repeatedGroup.getName)) .named(parquetList.getName) } @@ -243,7 +242,7 @@ private[parquet] object ParquetReadSupport { parquetMap: GroupType, keyType: DataType, valueType: DataType, - caseSensitive: Boolean): GroupType = { + conf: Configuration): GroupType = { // Precondition of this method, only handles maps with nested key types or value types. assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) @@ -255,8 +254,8 @@ private[parquet] object ParquetReadSupport { Types .repeatedGroup() .as(repeatedGroup.getOriginalType) - .addField(clipParquetType(parquetKeyType, keyType, caseSensitive)) - .addField(clipParquetType(parquetValueType, valueType, caseSensitive)) + .addField(clipParquetType(parquetKeyType, keyType, conf)) + .addField(clipParquetType(parquetValueType, valueType, conf)) .named(repeatedGroup.getName) Types @@ -275,8 +274,8 @@ private[parquet] object ParquetReadSupport { * pruning. */ private def clipParquetGroup( - parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = { - val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive) + parquetRecord: GroupType, structType: StructType, conf: Configuration): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, conf) Types .buildGroup(parquetRecord.getRepetition) .as(parquetRecord.getOriginalType) @@ -290,7 +289,9 @@ private[parquet] object ParquetReadSupport { * @return A list of clipped [[GroupType]] fields, which can be empty. */ private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = { + parquetRecord: GroupType, structType: StructType, conf: Configuration): Seq[Type] = { + val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key, + SQLConf.CASE_SENSITIVE.defaultValue.get) val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) if (caseSensitive) { val caseSensitiveParquetFieldMap = @@ -298,7 +299,7 @@ private[parquet] object ParquetReadSupport { structType.map { f => caseSensitiveParquetFieldMap .get(f.name) - .map(clipParquetType(_, f.dataType, caseSensitive)) + .map(clipParquetType(_, f.dataType, conf)) .getOrElse(toParquet.convertField(f)) } } else { @@ -310,12 +311,19 @@ private[parquet] object ParquetReadSupport { .get(f.name.toLowerCase(Locale.ROOT)) .map { parquetTypes => if (parquetTypes.size > 1) { - // Need to fail if there is ambiguity, i.e. more than one field is matched - val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") - throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ + - s"$parquetTypesString in case-insensitive mode") + val resolutionMode = ParquetDuplicatedFieldsResolutionMode.withName( + conf.get(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE, + ParquetDuplicatedFieldsResolutionMode.FAIL.toString)) + resolutionMode match { + case ParquetDuplicatedFieldsResolutionMode.FAIL => + val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") + throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ + + s"$parquetTypesString in case-insensitive mode") + case ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH => + clipParquetType(parquetTypes.head, f.dataType, conf) + } } else { - clipParquetType(parquetTypes.head, f.dataType, caseSensitive) + clipParquetType(parquetTypes.head, f.dataType, conf) } }.getOrElse(toParquet.convertField(f)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 7eefedb8ff5bb..e1e8b7a3e3241 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -1015,9 +1016,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest { parquetSchema: String, catalystSchema: StructType, expectedSchema: String, - caseSensitive: Boolean = true): Unit = { + conf: Configuration = { + val conf = new Configuration() + conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true) + conf + }): Unit = { testSchemaClipping(testName, parquetSchema, catalystSchema, - MessageTypeParser.parseMessageType(expectedSchema), caseSensitive) + MessageTypeParser.parseMessageType(expectedSchema), conf) } private def testSchemaClipping( @@ -1025,10 +1030,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { parquetSchema: String, catalystSchema: StructType, expectedSchema: MessageType, - caseSensitive: Boolean): Unit = { + conf: Configuration): Unit = { test(s"Clipping - $testName") { val actual = ParquetReadSupport.clipParquetSchema( - MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive) + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf) try { expectedSchema.checkContains(actual) @@ -1390,7 +1395,11 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE, - caseSensitive = true) + conf = { + val conf = new Configuration() + conf.setBoolean(SQLConf.CASE_SENSITIVE.key, true) + conf + }) testSchemaClipping( "disjoint field sets", @@ -1572,9 +1581,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 c; |} """.stripMargin, - caseSensitive = false) + conf = { + val conf = new Configuration() + conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) + conf + }) - test("Clipping - case-insensitive resolution: more than one field is matched") { + test("Clipping - case-insensitive resolution with ambiguity: fail to resolve fields") { val parquetSchema = """message root { | required group A { @@ -1590,9 +1603,45 @@ class ParquetSchemaSuite extends ParquetSchemaTest { .add("a", nestedType, nullable = true) .add("c", IntegerType, nullable = true) } + assertThrows[RuntimeException] { + val conf = new Configuration() + conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) ParquetReadSupport.clipParquetSchema( - MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false) + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, conf) } } + + testSchemaClipping( + "case-insensitive resolution with ambiguity: pick the first matched field", + parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + | optional int32 a; + |} + """.stripMargin, + catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + }, + expectedSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + conf = { + val conf = new Configuration() + conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false) + conf.set(ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE, + ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH.toString) + conf + }) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9fe83bb332a9a..443a96b23d343 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetDuplicatedFieldsResolutionMode, + ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -181,9 +182,20 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { + var isConvertible = false val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) + if (serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) { + if (conf.getConf(SQLConf.CASE_SENSITIVE)) { + logWarning("The behavior must be consistent to do the conversion. We skip the " + + "conversion in case-sensitive mode because hive parquet table always do " + + "case-insensitive field resolution.") + } else { + isConvertible = true + } + } else if (serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)) { + isConvertible = true + } + isConvertible } // Return true for Apache ORC and Hive ORC-related configuration names. @@ -200,9 +212,14 @@ case class RelationConversions( // Consider table and storage properties. For properties existing in both sides, storage // properties will supersede table properties. if (serde.contains("parquet")) { + logInfo("When converting hive parquet table to parquet data source, we switch the " + + "duplicated fields resolution mode to ask parquet data source to pick the first matched " + + "field - the same behavior as hive parquet table - to keep behaviors consistent.") val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + + (ParquetOptions.DUPLICATED_FIELDS_RESOLUTION_MODE -> + ParquetDuplicatedFieldsResolutionMode.FIRST_MATCH.toString) sessionCatalog.metastoreCatalog .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 2327d83a1b4f6..b18762cd1661c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -677,6 +677,77 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkAnswer(selfJoin, sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y")) } + + def checkConversion(df: DataFrame, converted: Boolean): Unit = { + val queryExecution = df.queryExecution + if (converted) { + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => + }.getOrElse { + fail(s"Expecting the query plan to convert parquet to data sources, " + + s"but got:\n$queryExecution") + } + } else { + queryExecution.analyzed.collectFirst { + case _: HiveTableRelation => + }.getOrElse { + fail(s"Expecting no conversion from parquet to data sources, " + + s"but got:\n$queryExecution") + } + } + } + + test("The behavior must be consistent to do the conversion") { + withTempDir { dir => + val tableName = "hive_parquet_with_duplicated_fields" + withTable(tableName) { + val data = spark.range(10).selectExpr("id as a", "id * 2 as A") + var col_a: Array[Row] = null; + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + data.write.format("parquet").mode("overwrite") + .save(dir.getCanonicalPath) + col_a = data.select("a").collect() + } + sql(s""" + |CREATE TABLE $tableName (A LONG) STORED AS parquet + |LOCATION '${dir.getCanonicalPath}' + """.stripMargin) + + // The hive parquet table always do case-insensitive field resolution. + // When there is ambiguity, it picks the first matched field. + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val df = sql(s"select A from $tableName") + checkConversion(df, false) + checkAnswer(df, col_a) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val df = sql(s"select A from $tableName") + checkConversion(df, false) + checkAnswer(sql(s"select A from $tableName"), col_a) + } + } + + // The behavior must be consistent to do the conversion. + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { + // If we can't keep behaviors consistent, we skip the conversion + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val df = sql(s"select A from $tableName") + checkConversion(df, false) + checkAnswer(df, col_a) + } + // When converting hive parquet table to parquet data source, we switch the duplicated + // fields resolution mode to ask parquet data source to pick the first matched field - + // the same behavior as hive parquet table - to keep behaviors consistent. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val df = sql(s"select A from $tableName") + checkConversion(df, true) + checkAnswer(df, col_a) + } + } + } + } + } } /**