Skip to content

Commit 5b2bd93

Browse files
committed
dedup first
1 parent 10c437e commit 5b2bd93

File tree

4 files changed

+39
-43
lines changed

4 files changed

+39
-43
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.io.IOException
2121
import java.net.URI
22+
import java.util.Locale
2223

2324
import scala.collection.JavaConverters._
2425
import scala.collection.mutable
@@ -36,7 +37,7 @@ import org.apache.parquet.hadoop._
3637
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
3738
import org.apache.parquet.hadoop.codec.CodecConfig
3839
import org.apache.parquet.hadoop.util.ContextUtil
39-
import org.apache.parquet.schema.MessageType
40+
import org.apache.parquet.schema.{GroupType, MessageType}
4041

4142
import org.apache.spark.{SparkException, TaskContext}
4243
import org.apache.spark.internal.Logging
@@ -367,11 +368,32 @@ class ParquetFileFormat
367368

368369
val sharedConf = broadcastedHadoopConf.value.value
369370

370-
lazy val footerFileMetaData =
371+
val footerFileMetaData =
371372
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
373+
val parquetSchema = footerFileMetaData.getSchema
374+
375+
def checkDuplicateFields(parquetRecord: GroupType): Unit = {
376+
val fields = parquetRecord.getFields.asScala
377+
val fieldMap = fields.groupBy(_.getName.toLowerCase(Locale.ROOT))
378+
fieldMap.foreach { case (_, types) =>
379+
if (types.size > 1) {
380+
// Need to fail if there is ambiguity, i.e. more than one field is duplicate
381+
val typesString = types.map(_.getName).mkString("[", ", ", "]")
382+
throw new RuntimeException(s"Found duplicate field(s):" +
383+
s"$typesString in case-insensitive mode")
384+
}
385+
}
386+
387+
fields.filter(!_.isPrimitive).foreach { groupField =>
388+
checkDuplicateFields(groupField.asGroupType())
389+
}
390+
}
391+
if (!isCaseSensitive) {
392+
checkDuplicateFields(parquetSchema)
393+
}
394+
372395
// Try to push down filters when filter push-down is enabled.
373396
val pushed = if (enableParquetFilterPushDown) {
374-
val parquetSchema = footerFileMetaData.getSchema
375397
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
376398
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
377399
filters

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -364,24 +364,16 @@ private[parquet] class ParquetFilters(
364364
// Here we don't flatten the fields in the nested schema but just look up through
365365
// root fields. Currently, accessing to nested fields does not push down filters
366366
// and it does not support to create filters for them.
367-
val primitiveFields =
367+
val primitiveFieldMap =
368368
dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
369369
f.getName -> ParquetField(f.getName,
370370
ParquetSchemaType(f.getOriginalType,
371371
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
372-
}
372+
}.toMap
373373
if (caseSensitive) {
374-
primitiveFields.toMap
374+
primitiveFieldMap
375375
} else {
376-
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
377-
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
378-
// See: SPARK-25132.
379-
val dedupPrimitiveFields =
380-
primitiveFields
381-
.groupBy(_._1.toLowerCase(Locale.ROOT))
382-
.filter(_._2.size == 1)
383-
.mapValues(_.head._2)
384-
CaseInsensitiveMap(dedupPrimitiveFields)
376+
CaseInsensitiveMap(primitiveFieldMap)
385377
}
386378
}
387379

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition
3030

3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
33+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3334
import org.apache.spark.sql.internal.SQLConf
3435
import org.apache.spark.sql.types._
3536

@@ -292,33 +293,14 @@ private[parquet] object ParquetReadSupport {
292293
private def clipParquetGroupFields(
293294
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
294295
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
295-
if (caseSensitive) {
296-
val caseSensitiveParquetFieldMap =
297-
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
298-
structType.map { f =>
299-
caseSensitiveParquetFieldMap
300-
.get(f.name)
301-
.map(clipParquetType(_, f.dataType, caseSensitive))
302-
.getOrElse(toParquet.convertField(f))
303-
}
304-
} else {
305-
// Do case-insensitive resolution only if in case-insensitive mode
306-
val caseInsensitiveParquetFieldMap =
307-
parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
308-
structType.map { f =>
309-
caseInsensitiveParquetFieldMap
310-
.get(f.name.toLowerCase(Locale.ROOT))
311-
.map { parquetTypes =>
312-
if (parquetTypes.size > 1) {
313-
// Need to fail if there is ambiguity, i.e. more than one field is matched
314-
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
315-
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
316-
s"$parquetTypesString in case-insensitive mode")
317-
} else {
318-
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
319-
}
320-
}.getOrElse(toParquet.convertField(f))
321-
}
296+
val fieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
297+
val finalParquetFieldMap =
298+
if (caseSensitive) fieldMap else CaseInsensitiveMap(fieldMap)
299+
structType.map { f =>
300+
finalParquetFieldMap
301+
.get(f.name)
302+
.map(clipParquetType(_, f.dataType, caseSensitive))
303+
.getOrElse(toParquet.convertField(f))
322304
}
323305
}
324306

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1106,7 +1106,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
11061106
}
11071107

11081108
test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
1109-
withTempDir { dir =>
1109+
withTempPath { dir =>
11101110
val tableName = "spark_25207"
11111111
val tableDir = dir.getAbsoluteFile + "/table"
11121112
withTable(tableName) {

0 commit comments

Comments
 (0)