From 5902afe6fb6e88f98fb4f2649e59156264bc3e4d Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 23 Aug 2018 15:16:42 +0800 Subject: [PATCH 01/16] [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet --- .../parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 70 ++++++++++++------ .../parquet/ParquetFilterSuite.scala | 74 +++++++++++++++++++ 3 files changed, 121 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d7eb14356b8b..5bc5fede8b6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -377,7 +377,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(parquetFilters.createFilter(parquetSchema, _)) + .flatMap(parquetFilters.createFilter(parquetSchema, _, sqlConf.caseSensitiveAnalysis)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 58b4a769fcb6..dec989ab363a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} +import java.util.Locale import scala.collection.JavaConverters.asScalaBufferConverter @@ -31,7 +32,7 @@ import org.apache.parquet.schema.OriginalType._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String @@ -350,25 +351,46 @@ private[parquet] class ParquetFilters( } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns nameMap and typeMap based on different case sensitive mode, if predicate push + * down applies. */ - private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match { - case m: MessageType => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => + private def getFieldMaps(dataType: MessageType, caseSensitive: Boolean) + : (Map[String, String], Map[String, ParquetSchemaType]) = { + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. + val primitiveFields = dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()) + if (caseSensitive) { + val nameMap = primitiveFields.map { f => + f.getName -> f.getName + }.toMap + val typeMap = primitiveFields.map { f => f.getName -> ParquetSchemaType( f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) }.toMap - case _ => Map.empty[String, ParquetSchemaType] + (nameMap, typeMap) + } else { + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. + val dedupFields = primitiveFields.map { f => + f.getName -> ParquetSchemaType( + f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) + }.groupBy(_._1.toLowerCase(Locale.ROOT)).filter(_._2.size == 1).mapValues(_.head) + val nameMap = CaseInsensitiveMap(dedupFields.mapValues(_._1)) + val typeMap = CaseInsensitiveMap(dedupFields.mapValues(_._2)) + (nameMap, typeMap) + } } /** * Converts data sources filters to Parquet filter predicates. */ - def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { - val nameToType = getFieldMap(schema) + def createFilter( + schema: MessageType, + predicate: sources.Filter, + caseSensitive: Boolean = true): Option[FilterPredicate] = { + val (nameMap, typeMap) = getFieldMaps(schema, caseSensitive) // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. @@ -381,7 +403,7 @@ private[parquet] class ParquetFilters( // Parquet's type in the given file should be matched to the value's type // in the pushed filter in order to push down the filter to Parquet. def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (nameToType(name) match { + value == null || (typeMap(name) match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] case ParquetLongType => value.isInstanceOf[JLong] @@ -408,7 +430,7 @@ private[parquet] class ParquetFilters( // filters for the column having dots in the names. Thus, we do not push down such filters. // See SPARK-20364. def canMakeFilterOn(name: String, value: Any): Boolean = { - nameToType.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) + typeMap.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) } // NOTE: @@ -428,29 +450,29 @@ private[parquet] class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => - makeEq.lift(nameToType(name)).map(_(name, null)) + makeEq.lift(typeMap(name)).map(_(nameMap(name), null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => - makeNotEq.lift(nameToType(name)).map(_(name, null)) + makeNotEq.lift(typeMap(name)).map(_(nameMap(name), null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToType(name)).map(_(name, value)) + makeEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToType(name)).map(_(name, value)) + makeNotEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToType(name)).map(_(name, value)) + makeEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToType(name)).map(_(name, value)) + makeNotEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => - makeLt.lift(nameToType(name)).map(_(name, value)) + makeLt.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeLtEq.lift(nameToType(name)).map(_(name, value)) + makeLtEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => - makeGt.lift(nameToType(name)).map(_(name, value)) + makeGt.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeGtEq.lift(nameToType(name)).map(_(name, value)) + makeGtEq.lift(typeMap(name)).map(_(nameMap(name), value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the @@ -477,7 +499,7 @@ private[parquet] class ParquetFilters( case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => - makeEq.lift(nameToType(name)).map(_(name, v)) + makeEq.lift(typeMap(name)).map(_(nameMap(name), v)) }.reduceLeftOption(FilterApi.or) case sources.StringStartsWith(name, prefix) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index be4f498c921a..fbc40cfe1d1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1021,6 +1021,80 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Case-insensitive field resolution for pushdown when reading parquet") { + def testCaseInsensitiveResolution( + schema: StructType, + expected: FilterPredicate, + filter: sources.Filter): Unit = { + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + + assertResult(Some(expected)) { + parquetFilters.createFilter(parquetSchema, filter, caseSensitive = false) + } + assertResult(None) { + parquetFilters.createFilter(parquetSchema, filter, caseSensitive = true) + } + } + + val schema = StructType(Seq(StructField("cint", IntegerType))) + + testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT")) + + testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]), + sources.IsNotNull("CINT")) + + testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualTo("CINT", 1000))) + + testCaseInsensitiveResolution( + schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, + FilterApi.notEq(intColumn("cint"), 1000: Integer), + sources.Not(sources.EqualNullSafe("CINT", 1000))) + + testCaseInsensitiveResolution( + schema, + FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, + FilterApi.ltEq(intColumn("cint"), 1000: Integer), + sources.LessThanOrEqual("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, + FilterApi.gtEq(intColumn("cint"), 1000: Integer), + sources.GreaterThanOrEqual("CINT", 1000)) + + testCaseInsensitiveResolution( + schema, + FilterApi.or( + FilterApi.eq(intColumn("cint"), 10: Integer), + FilterApi.eq(intColumn("cint"), 20: Integer)), + sources.In("CINT", Array(10, 20))) + + val dupFieldSchema = StructType( + Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) + val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) + assertResult(None) { + parquetFilters.createFilter( + dupParquetSchema, sources.EqualTo("CINT", 1000), caseSensitive = false) + } + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { From 2226eaef2eab44c94d4ada0ea86e92c21eb5945d Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 23 Aug 2018 18:43:15 +0800 Subject: [PATCH 02/16] fix "org.apache.spark.sql.avro.AvroSuite.convert formats" --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5bc5fede8b6a..f26841dc7525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -347,6 +347,7 @@ class ParquetFileFormat val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -377,7 +378,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(parquetFilters.createFilter(parquetSchema, _, sqlConf.caseSensitiveAnalysis)) + .flatMap(parquetFilters.createFilter(parquetSchema, _, isCaseSensitive)) .reduceOption(FilterApi.and) } else { None From c76189d96fe7f67123ba6558b1d53680398697fd Mon Sep 17 00:00:00 2001 From: yucai Date: Sat, 25 Aug 2018 13:12:29 +0800 Subject: [PATCH 03/16] address comments --- .../parquet/ParquetFileFormat.scala | 4 ++-- .../datasources/parquet/ParquetFilters.scala | 12 +++++------ .../parquet/ParquetFilterSuite.scala | 20 +++++++++++++------ 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f26841dc7525..ea4f1592a7c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -373,12 +373,12 @@ class ParquetFileFormat val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, pushDownInFilterThreshold) + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(parquetFilters.createFilter(parquetSchema, _, isCaseSensitive)) + .flatMap(parquetFilters.createFilter(parquetSchema, _)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index dec989ab363a..8796c3f7498c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -45,7 +45,8 @@ private[parquet] class ParquetFilters( pushDownTimestamp: Boolean, pushDownDecimal: Boolean, pushDownStartWith: Boolean, - pushDownInFilterThreshold: Int) { + pushDownInFilterThreshold: Int, + caseSensitive: Boolean) { private case class ParquetSchemaType( originalType: OriginalType, @@ -354,7 +355,7 @@ private[parquet] class ParquetFilters( * Returns nameMap and typeMap based on different case sensitive mode, if predicate push * down applies. */ - private def getFieldMaps(dataType: MessageType, caseSensitive: Boolean) + private def getFieldMaps(dataType: MessageType) : (Map[String, String], Map[String, ParquetSchemaType]) = { // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters @@ -386,11 +387,8 @@ private[parquet] class ParquetFilters( /** * Converts data sources filters to Parquet filter predicates. */ - def createFilter( - schema: MessageType, - predicate: sources.Filter, - caseSensitive: Boolean = true): Option[FilterPredicate] = { - val (nameMap, typeMap) = getFieldMaps(schema, caseSensitive) + def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { + val (nameMap, typeMap) = getFieldMaps(schema) // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index fbc40cfe1d1e..5e382773145a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -60,7 +60,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold) + conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis) override def beforeEach(): Unit = { super.beforeEach() @@ -1022,7 +1022,15 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("Case-insensitive field resolution for pushdown when reading parquet") { + test("SPARK-25132: Case-insensitive field resolution for pushdown when reading parquet") { + val caseSensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, + conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, + conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) + val caseInsensitiveParquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, + conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, + conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) def testCaseInsensitiveResolution( schema: StructType, expected: FilterPredicate, @@ -1030,10 +1038,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) assertResult(Some(expected)) { - parquetFilters.createFilter(parquetSchema, filter, caseSensitive = false) + caseInsensitiveParquetFilters.createFilter(parquetSchema, filter) } assertResult(None) { - parquetFilters.createFilter(parquetSchema, filter, caseSensitive = true) + caseSensitiveParquetFilters.createFilter(parquetSchema, filter) } } @@ -1091,8 +1099,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) assertResult(None) { - parquetFilters.createFilter( - dupParquetSchema, sources.EqualTo("CINT", 1000), caseSensitive = false) + caseInsensitiveParquetFilters.createFilter( + dupParquetSchema, sources.EqualTo("CINT", 1000)) } } } From 10cd89ff46892724d513a68bd82cf1a022387436 Mon Sep 17 00:00:00 2001 From: yucai Date: Sat, 25 Aug 2018 23:40:27 +0800 Subject: [PATCH 04/16] address comments --- .../datasources/parquet/ParquetFilters.scala | 67 +++++++++---------- .../parquet/ParquetFilterSuite.scala | 31 ++++++++- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 8796c3f7498c..c1ab37c9334a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -48,6 +48,10 @@ private[parquet] class ParquetFilters( pushDownInFilterThreshold: Int, caseSensitive: Boolean) { + private case class ParquetField( + name: String, + schema: ParquetSchemaType) + private case class ParquetSchemaType( originalType: OriginalType, primitiveTypeName: PrimitiveTypeName, @@ -352,35 +356,30 @@ private[parquet] class ParquetFilters( } /** - * Returns nameMap and typeMap based on different case sensitive mode, if predicate push - * down applies. + * Returns a map, which contains parquet field name and data type, if predicate push down applies. */ - private def getFieldMaps(dataType: MessageType) - : (Map[String, String], Map[String, ParquetSchemaType]) = { + private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = { // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters // and it does not support to create filters for them. - val primitiveFields = dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()) + val primitiveFields = + dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => + f.getName -> ParquetField(f.getName, + ParquetSchemaType(f.getOriginalType, + f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + } if (caseSensitive) { - val nameMap = primitiveFields.map { f => - f.getName -> f.getName - }.toMap - val typeMap = primitiveFields.map { f => - f.getName -> ParquetSchemaType( - f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) - }.toMap - (nameMap, typeMap) + primitiveFields.toMap } else { // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive // mode, just skip pushdown for these fields, they will trigger Exception when reading, // See: SPARK-25132. - val dedupFields = primitiveFields.map { f => - f.getName -> ParquetSchemaType( - f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) - }.groupBy(_._1.toLowerCase(Locale.ROOT)).filter(_._2.size == 1).mapValues(_.head) - val nameMap = CaseInsensitiveMap(dedupFields.mapValues(_._1)) - val typeMap = CaseInsensitiveMap(dedupFields.mapValues(_._2)) - (nameMap, typeMap) + val dedupPrimitiveFields = + primitiveFields + .groupBy(_._1.toLowerCase(Locale.ROOT)) + .filter(_._2.size == 1) + .mapValues(_.head._2) + CaseInsensitiveMap(dedupPrimitiveFields) } } @@ -388,7 +387,7 @@ private[parquet] class ParquetFilters( * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { - val (nameMap, typeMap) = getFieldMaps(schema) + val fieldMap = getFieldMap(schema) // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. @@ -401,7 +400,7 @@ private[parquet] class ParquetFilters( // Parquet's type in the given file should be matched to the value's type // in the pushed filter in order to push down the filter to Parquet. def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (typeMap(name) match { + value == null || (fieldMap(name).schema match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] case ParquetLongType => value.isInstanceOf[JLong] @@ -428,7 +427,7 @@ private[parquet] class ParquetFilters( // filters for the column having dots in the names. Thus, we do not push down such filters. // See SPARK-20364. def canMakeFilterOn(name: String, value: Any): Boolean = { - typeMap.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) + fieldMap.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) } // NOTE: @@ -448,29 +447,29 @@ private[parquet] class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => - makeEq.lift(typeMap(name)).map(_(nameMap(name), null)) + makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => - makeNotEq.lift(typeMap(name)).map(_(nameMap(name), null)) + makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => - makeLt.lift(typeMap(name)).map(_(nameMap(name), value)) + makeLt.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeLtEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeLtEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => - makeGt.lift(typeMap(name)).map(_(nameMap(name), value)) + makeGt.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeGtEq.lift(typeMap(name)).map(_(nameMap(name), value)) + makeGtEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the @@ -497,7 +496,7 @@ private[parquet] class ParquetFilters( case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => - makeEq.lift(typeMap(name)).map(_(nameMap(name), v)) + makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, v)) }.reduceLeftOption(FilterApi.or) case sources.StringStartsWith(name, prefix) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5e382773145a..c6a1b0a0ad3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -25,6 +25,7 @@ import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operato import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ @@ -1022,15 +1023,17 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-25132: Case-insensitive field resolution for pushdown when reading parquet") { + test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { val caseSensitiveParquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) + val caseInsensitiveParquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) + def testCaseInsensitiveResolution( schema: StructType, expected: FilterPredicate, @@ -1103,6 +1106,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex dupParquetSchema, sources.EqualTo("CINT", 1000)) } } + + test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet" + + " - exception when duplicate fields in case-insensitive mode") { + withTempDir { dir => + val tableName = "spark_25207" + val tableDir = dir.getAbsoluteFile + "/table" + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(10).selectExpr("id as A", "id as B", "id as b") + .write.mode("overwrite").parquet(tableDir) + } + sql( + s""" + |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' + """.stripMargin) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[SparkException] { + sql(s"select a from $tableName where b > 0").collect() + } + assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( + """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) + } + } + } + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { From 1ea94cca149b7a4814ea617f673111ea0f26219d Mon Sep 17 00:00:00 2001 From: yucai Date: Sun, 26 Aug 2018 11:05:18 +0800 Subject: [PATCH 05/16] address comments --- .../datasources/parquet/ParquetFilters.scala | 30 +++++++++---------- .../parquet/ParquetFilterSuite.scala | 15 ++++------ 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c1ab37c9334a..455c92ca4b01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -49,7 +49,7 @@ private[parquet] class ParquetFilters( caseSensitive: Boolean) { private case class ParquetField( - name: String, + resolvedName: String, schema: ParquetSchemaType) private case class ParquetSchemaType( @@ -387,7 +387,7 @@ private[parquet] class ParquetFilters( * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { - val fieldMap = getFieldMap(schema) + val nameToParquet = getFieldMap(schema) // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. @@ -400,7 +400,7 @@ private[parquet] class ParquetFilters( // Parquet's type in the given file should be matched to the value's type // in the pushed filter in order to push down the filter to Parquet. def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (fieldMap(name).schema match { + value == null || (nameToParquet(name).schema match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] case ParquetLongType => value.isInstanceOf[JLong] @@ -427,7 +427,7 @@ private[parquet] class ParquetFilters( // filters for the column having dots in the names. Thus, we do not push down such filters. // See SPARK-20364. def canMakeFilterOn(name: String, value: Any): Boolean = { - fieldMap.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) + nameToParquet.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) } // NOTE: @@ -447,29 +447,29 @@ private[parquet] class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => - makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null)) + makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => - makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, null)) + makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => - makeLt.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeLt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeLtEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeLtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => - makeGt.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeGt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeGtEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, value)) + makeGtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the @@ -496,7 +496,7 @@ private[parquet] class ParquetFilters( case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => - makeEq.lift(fieldMap(name).schema).map(_(fieldMap(name).name, v)) + makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, v)) }.reduceLeftOption(FilterApi.or) case sources.StringStartsWith(name, prefix) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c6a1b0a0ad3c..27fede71685f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1024,15 +1024,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { - val caseSensitiveParquetFilters = + def createParquetFilter(caseSensitive: Boolean): ParquetFilters = { new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true) - - val caseInsensitiveParquetFilters = - new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, - conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false) + conf.parquetFilterPushDownInFilterThreshold, caseSensitive) + } + val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true) + val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false) def testCaseInsensitiveResolution( schema: StructType, @@ -1107,8 +1105,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet" + - " - exception when duplicate fields in case-insensitive mode") { + test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { withTempDir { dir => val tableName = "spark_25207" val tableDir = dir.getAbsoluteFile + "/table" From 90b8717612ab43d97228d77deaae3d6f33add055 Mon Sep 17 00:00:00 2001 From: yucai Date: Sun, 26 Aug 2018 12:58:22 +0800 Subject: [PATCH 06/16] address comments --- .../datasources/parquet/ParquetFilters.scala | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 455c92ca4b01..ed28708127e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -49,8 +49,10 @@ private[parquet] class ParquetFilters( caseSensitive: Boolean) { private case class ParquetField( - resolvedName: String, - schema: ParquetSchemaType) + // field name in parquet file + fieldName: String, + // field schema type in parquet file + fieldSchema: ParquetSchemaType) private case class ParquetSchemaType( originalType: OriginalType, @@ -387,7 +389,7 @@ private[parquet] class ParquetFilters( * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { - val nameToParquet = getFieldMap(schema) + val nameToParquetField = getFieldMap(schema) // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. @@ -400,7 +402,7 @@ private[parquet] class ParquetFilters( // Parquet's type in the given file should be matched to the value's type // in the pushed filter in order to push down the filter to Parquet. def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (nameToParquet(name).schema match { + value == null || (nameToParquetField(name).fieldSchema match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] case ParquetLongType => value.isInstanceOf[JLong] @@ -427,7 +429,7 @@ private[parquet] class ParquetFilters( // filters for the column having dots in the names. Thus, we do not push down such filters. // See SPARK-20364. def canMakeFilterOn(name: String, value: Any): Boolean = { - nameToParquet.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) + nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) } // NOTE: @@ -447,29 +449,39 @@ private[parquet] class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => - makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null)) + makeEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => - makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null)) + makeNotEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeNotEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeNotEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => - makeLt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeLt.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeLtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeLtEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => - makeGt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeGt.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeGtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value)) + makeGtEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the @@ -496,7 +508,8 @@ private[parquet] class ParquetFilters( case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => - makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, v)) + makeEq.lift(nameToParquetField(name).fieldSchema) + .map(_(nameToParquetField(name).fieldName, v)) }.reduceLeftOption(FilterApi.or) case sources.StringStartsWith(name, prefix) From 10c437eaee6b767f744e5c2227b96efc4f9bc685 Mon Sep 17 00:00:00 2001 From: yucai Date: Sun, 26 Aug 2018 13:10:20 +0800 Subject: [PATCH 07/16] rename fieldSchema to fieldType --- .../datasources/parquet/ParquetFilters.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index ed28708127e0..3c6703767199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -51,8 +51,8 @@ private[parquet] class ParquetFilters( private case class ParquetField( // field name in parquet file fieldName: String, - // field schema type in parquet file - fieldSchema: ParquetSchemaType) + // field type related info in parquet file + fieldType: ParquetSchemaType) private case class ParquetSchemaType( originalType: OriginalType, @@ -402,7 +402,7 @@ private[parquet] class ParquetFilters( // Parquet's type in the given file should be matched to the value's type // in the pushed filter in order to push down the filter to Parquet. def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (nameToParquetField(name).fieldSchema match { + value == null || (nameToParquetField(name).fieldType match { case ParquetBooleanType => value.isInstanceOf[JBoolean] case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] case ParquetLongType => value.isInstanceOf[JLong] @@ -449,38 +449,38 @@ private[parquet] class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => - makeEq.lift(nameToParquetField(name).fieldSchema) + makeEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => - makeNotEq.lift(nameToParquetField(name).fieldSchema) + makeNotEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToParquetField(name).fieldSchema) + makeEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToParquetField(name).fieldSchema) + makeNotEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => - makeEq.lift(nameToParquetField(name).fieldSchema) + makeEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => - makeNotEq.lift(nameToParquetField(name).fieldSchema) + makeNotEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => - makeLt.lift(nameToParquetField(name).fieldSchema) + makeLt.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeLtEq.lift(nameToParquetField(name).fieldSchema) + makeLtEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => - makeGt.lift(nameToParquetField(name).fieldSchema) + makeGt.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => - makeGtEq.lift(nameToParquetField(name).fieldSchema) + makeGtEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => @@ -508,7 +508,7 @@ private[parquet] class ParquetFilters( case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => - makeEq.lift(nameToParquetField(name).fieldSchema) + makeEq.lift(nameToParquetField(name).fieldType) .map(_(nameToParquetField(name).fieldName, v)) }.reduceLeftOption(FilterApi.or) From 5b2bd931cfce64dc4aaaba81b396ffeb5bd5a1f0 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 00:37:57 +0800 Subject: [PATCH 08/16] dedup first --- .../parquet/ParquetFileFormat.scala | 28 +++++++++++++-- .../datasources/parquet/ParquetFilters.scala | 16 +++------ .../parquet/ParquetReadSupport.scala | 36 +++++-------------- .../parquet/ParquetFilterSuite.scala | 2 +- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ea4f1592a7c2..2dd5d8a61cd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.IOException import java.net.URI +import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable @@ -36,7 +37,7 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, MessageType} import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging @@ -367,11 +368,32 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value - lazy val footerFileMetaData = + val footerFileMetaData = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val parquetSchema = footerFileMetaData.getSchema + + def checkDuplicateFields(parquetRecord: GroupType): Unit = { + val fields = parquetRecord.getFields.asScala + val fieldMap = fields.groupBy(_.getName.toLowerCase(Locale.ROOT)) + fieldMap.foreach { case (_, types) => + if (types.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is duplicate + val typesString = types.map(_.getName).mkString("[", ", ", "]") + throw new RuntimeException(s"Found duplicate field(s):" + + s"$typesString in case-insensitive mode") + } + } + + fields.filter(!_.isPrimitive).foreach { groupField => + checkDuplicateFields(groupField.asGroupType()) + } + } + if (!isCaseSensitive) { + checkDuplicateFields(parquetSchema) + } + // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 3c6703767199..a55abe5c46a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -364,24 +364,16 @@ private[parquet] class ParquetFilters( // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters // and it does not support to create filters for them. - val primitiveFields = + val primitiveFieldMap = dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => f.getName -> ParquetField(f.getName, ParquetSchemaType(f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) - } + }.toMap if (caseSensitive) { - primitiveFields.toMap + primitiveFieldMap } else { - // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive - // mode, just skip pushdown for these fields, they will trigger Exception when reading, - // See: SPARK-25132. - val dedupPrimitiveFields = - primitiveFields - .groupBy(_._1.toLowerCase(Locale.ROOT)) - .filter(_._2.size == 1) - .mapValues(_.head._2) - CaseInsensitiveMap(dedupPrimitiveFields) + CaseInsensitiveMap(primitiveFieldMap) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 3319e73f2b31..0846432d6484 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -292,33 +293,14 @@ private[parquet] object ParquetReadSupport { private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = { val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) - if (caseSensitive) { - val caseSensitiveParquetFieldMap = - parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - structType.map { f => - caseSensitiveParquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType, caseSensitive)) - .getOrElse(toParquet.convertField(f)) - } - } else { - // Do case-insensitive resolution only if in case-insensitive mode - val caseInsensitiveParquetFieldMap = - parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT)) - structType.map { f => - caseInsensitiveParquetFieldMap - .get(f.name.toLowerCase(Locale.ROOT)) - .map { parquetTypes => - if (parquetTypes.size > 1) { - // Need to fail if there is ambiguity, i.e. more than one field is matched - val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") - throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ + - s"$parquetTypesString in case-insensitive mode") - } else { - clipParquetType(parquetTypes.head, f.dataType, caseSensitive) - } - }.getOrElse(toParquet.convertField(f)) - } + val fieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val finalParquetFieldMap = + if (caseSensitive) fieldMap else CaseInsensitiveMap(fieldMap) + structType.map { f => + finalParquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType, caseSensitive)) + .getOrElse(toParquet.convertField(f)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 27fede71685f..5e0a73891eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1106,7 +1106,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { - withTempDir { dir => + withTempPath { dir => val tableName = "spark_25207" val tableDir = dir.getAbsoluteFile + "/table" withTable(tableName) { From 9142f4925f5c127fe790c02bc8525e415aa1a60b Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 01:51:32 +0800 Subject: [PATCH 09/16] improve uts --- .../parquet/ParquetFileFormat.scala | 6 +- .../spark/sql/FileBasedDataSourceSuite.scala | 66 ++++++++++++------- .../parquet/ParquetFilterSuite.scala | 33 ---------- 3 files changed, 43 insertions(+), 62 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2dd5d8a61cd5..27309843440e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -311,9 +311,6 @@ class ParquetFileFormat hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, sparkSession.sessionState.conf.sessionLocalTimeZone) - hadoopConf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) @@ -349,6 +346,7 @@ class ParquetFileFormat val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, isCaseSensitive) (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -379,7 +377,7 @@ class ParquetFileFormat if (types.size > 1) { // Need to fail if there is ambiguity, i.e. more than one field is duplicate val typesString = types.map(_.getName).mkString("[", ", ", "]") - throw new RuntimeException(s"Found duplicate field(s):" + + throw new RuntimeException(s"Found duplicate field(s): " + s"$typesString in case-insensitive mode") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 4aa6afd69620..4564122b5356 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -435,40 +435,56 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo withTempDir { dir => val format = "parquet" val tableDir = dir.getCanonicalPath + s"/$format" - val tableName = s"spark_25132_${format}" + val tableName = s"spark_25132_$format" withTable(tableName) { - val end = 5 - val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") + val count = 5 + val data = spark.range(count).selectExpr("id as a") + data.write.format(format).mode("overwrite").save(tableDir) + sql(s"CREATE TABLE $tableName (A LONG) USING $format LOCATION '$tableDir'") + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkAnswer(sql(s"select a from $tableName"), data.select("a")) + checkAnswer(sql(s"select A from $tableName"), data.select("a")) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - data.write.format(format).mode("overwrite").save(tableDir) + val e = intercept[AnalysisException] { + sql(s"select a from $tableName").collect() + } + assert(e.getMessage.contains( + "cannot resolve '`a`' given input columns: [default.spark_25132_parquet.A]")) + checkAnswer(sql(s"select A from $tableName"), (0 until count).map(_ => Row(null))) } - sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") + } + } + } - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - checkAnswer(sql(s"select a from $tableName"), data.select("A")) - checkAnswer(sql(s"select A from $tableName"), data.select("A")) + test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { + withTempPath { dir => + val tableName = "spark_25207" + val tableDir = dir.getAbsoluteFile + "/table" + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(10).selectExpr("id as A", "id as B", "id as b") + .write.mode("overwrite").parquet(tableDir) + } + sql( + s""" + |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' + """.stripMargin) - // RuntimeException is triggered at executor side, which is then wrapped as - // SparkException at driver side + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { val e1 = intercept[SparkException] { - sql(s"select b from $tableName").collect() + sql(s"select a from $tableName").collect() } - assert( - e1.getCause.isInstanceOf[RuntimeException] && - e1.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + assert(e1.getCause.isInstanceOf[RuntimeException] && e1.getCause.getMessage.contains( + """Found duplicate field(s): [B, b] in case-insensitive mode""")) + val e2 = intercept[SparkException] { - sql(s"select B from $tableName").collect() + sql(s"select a from $tableName where b > 0").collect() } - assert( - e2.getCause.isInstanceOf[RuntimeException] && - e2.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) - } - - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null))) - checkAnswer(sql(s"select b from $tableName"), data.select("b")) + assert(e2.getCause.isInstanceOf[RuntimeException] && e2.getCause.getMessage.contains( + """Found duplicate field(s): [B, b] in case-insensitive mode""")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5e0a73891eae..aef66963d599 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,39 +1095,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex FilterApi.eq(intColumn("cint"), 10: Integer), FilterApi.eq(intColumn("cint"), 20: Integer)), sources.In("CINT", Array(10, 20))) - - val dupFieldSchema = StructType( - Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) - val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) - assertResult(None) { - caseInsensitiveParquetFilters.createFilter( - dupParquetSchema, sources.EqualTo("CINT", 1000)) - } - } - - test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { - withTempPath { dir => - val tableName = "spark_25207" - val tableDir = dir.getAbsoluteFile + "/table" - withTable(tableName) { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - spark.range(10).selectExpr("id as A", "id as B", "id as b") - .write.mode("overwrite").parquet(tableDir) - } - sql( - s""" - |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' - """.stripMargin) - - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - val e = intercept[SparkException] { - sql(s"select a from $tableName where b > 0").collect() - } - assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( - """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) - } - } - } } } From cb03fb737f1e556792c9284cd5102148a91fa483 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 08:54:24 +0800 Subject: [PATCH 10/16] modify ParuqetSchemaSuite --- .../parquet/ParquetSchemaSuite.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 7eefedb8ff5b..eca9f044f237 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1573,26 +1573,4 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, caseSensitive = false) - - test("Clipping - case-insensitive resolution: more than one field is matched") { - val parquetSchema = - """message root { - | required group A { - | optional int32 B; - | } - | optional int32 c; - | optional int32 a; - |} - """.stripMargin - val catalystSchema = { - val nestedType = new StructType().add("b", IntegerType, nullable = true) - new StructType() - .add("a", nestedType, nullable = true) - .add("c", IntegerType, nullable = true) - } - assertThrows[RuntimeException] { - ParquetReadSupport.clipParquetSchema( - MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false) - } - } } From 86a0cb04032c55a5f99c19cc8ee2b211ce36461d Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 10:27:27 +0800 Subject: [PATCH 11/16] Revert "modify ParuqetSchemaSuite" This reverts commit cb03fb737f1e556792c9284cd5102148a91fa483. --- .../parquet/ParquetSchemaSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index eca9f044f237..7eefedb8ff5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1573,4 +1573,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin, caseSensitive = false) + + test("Clipping - case-insensitive resolution: more than one field is matched") { + val parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + | optional int32 a; + |} + """.stripMargin + val catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + } + assertThrows[RuntimeException] { + ParquetReadSupport.clipParquetSchema( + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false) + } + } } From 29a580485da28ed8e5f23654f61024b2dba76336 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 10:27:42 +0800 Subject: [PATCH 12/16] Revert "improve uts" This reverts commit 9142f4925f5c127fe790c02bc8525e415aa1a60b. --- .../parquet/ParquetFileFormat.scala | 6 +- .../spark/sql/FileBasedDataSourceSuite.scala | 66 +++++++------------ .../parquet/ParquetFilterSuite.scala | 33 ++++++++++ 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 27309843440e..2dd5d8a61cd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -311,6 +311,9 @@ class ParquetFileFormat hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) @@ -346,7 +349,6 @@ class ParquetFileFormat val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis - hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, isCaseSensitive) (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -377,7 +379,7 @@ class ParquetFileFormat if (types.size > 1) { // Need to fail if there is ambiguity, i.e. more than one field is duplicate val typesString = types.map(_.getName).mkString("[", ", ", "]") - throw new RuntimeException(s"Found duplicate field(s): " + + throw new RuntimeException(s"Found duplicate field(s):" + s"$typesString in case-insensitive mode") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 4564122b5356..4aa6afd69620 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -435,56 +435,40 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo withTempDir { dir => val format = "parquet" val tableDir = dir.getCanonicalPath + s"/$format" - val tableName = s"spark_25132_$format" + val tableName = s"spark_25132_${format}" withTable(tableName) { - val count = 5 - val data = spark.range(count).selectExpr("id as a") - data.write.format(format).mode("overwrite").save(tableDir) - sql(s"CREATE TABLE $tableName (A LONG) USING $format LOCATION '$tableDir'") - - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - checkAnswer(sql(s"select a from $tableName"), data.select("a")) - checkAnswer(sql(s"select A from $tableName"), data.select("a")) - } - + val end = 5 + val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val e = intercept[AnalysisException] { - sql(s"select a from $tableName").collect() - } - assert(e.getMessage.contains( - "cannot resolve '`a`' given input columns: [default.spark_25132_parquet.A]")) - checkAnswer(sql(s"select A from $tableName"), (0 until count).map(_ => Row(null))) + data.write.format(format).mode("overwrite").save(tableDir) } - } - } - } - - test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { - withTempPath { dir => - val tableName = "spark_25207" - val tableDir = dir.getAbsoluteFile + "/table" - withTable(tableName) { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - spark.range(10).selectExpr("id as A", "id as B", "id as b") - .write.mode("overwrite").parquet(tableDir) - } - sql( - s""" - |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' - """.stripMargin) + sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkAnswer(sql(s"select a from $tableName"), data.select("A")) + checkAnswer(sql(s"select A from $tableName"), data.select("A")) + + // RuntimeException is triggered at executor side, which is then wrapped as + // SparkException at driver side val e1 = intercept[SparkException] { - sql(s"select a from $tableName").collect() + sql(s"select b from $tableName").collect() } - assert(e1.getCause.isInstanceOf[RuntimeException] && e1.getCause.getMessage.contains( - """Found duplicate field(s): [B, b] in case-insensitive mode""")) - + assert( + e1.getCause.isInstanceOf[RuntimeException] && + e1.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) val e2 = intercept[SparkException] { - sql(s"select a from $tableName where b > 0").collect() + sql(s"select B from $tableName").collect() } - assert(e2.getCause.isInstanceOf[RuntimeException] && e2.getCause.getMessage.contains( - """Found duplicate field(s): [B, b] in case-insensitive mode""")) + assert( + e2.getCause.isInstanceOf[RuntimeException] && + e2.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null))) + checkAnswer(sql(s"select b from $tableName"), data.select("b")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index aef66963d599..5e0a73891eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,6 +1095,39 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex FilterApi.eq(intColumn("cint"), 10: Integer), FilterApi.eq(intColumn("cint"), 20: Integer)), sources.In("CINT", Array(10, 20))) + + val dupFieldSchema = StructType( + Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) + val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) + assertResult(None) { + caseInsensitiveParquetFilters.createFilter( + dupParquetSchema, sources.EqualTo("CINT", 1000)) + } + } + + test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { + withTempPath { dir => + val tableName = "spark_25207" + val tableDir = dir.getAbsoluteFile + "/table" + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(10).selectExpr("id as A", "id as B", "id as b") + .write.mode("overwrite").parquet(tableDir) + } + sql( + s""" + |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir' + """.stripMargin) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[SparkException] { + sql(s"select a from $tableName where b > 0").collect() + } + assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( + """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) + } + } + } } } From db49461181a11eb22ddc87a1a9bc6c72c3a10728 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 10:28:01 +0800 Subject: [PATCH 13/16] Revert "dedup first" This reverts commit 5b2bd931cfce64dc4aaaba81b396ffeb5bd5a1f0. --- .../parquet/ParquetFileFormat.scala | 28 ++------------- .../datasources/parquet/ParquetFilters.scala | 16 ++++++--- .../parquet/ParquetReadSupport.scala | 36 ++++++++++++++----- .../parquet/ParquetFilterSuite.scala | 2 +- 4 files changed, 43 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2dd5d8a61cd5..ea4f1592a7c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.IOException import java.net.URI -import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable @@ -37,7 +36,7 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.schema.{GroupType, MessageType} +import org.apache.parquet.schema.MessageType import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging @@ -368,32 +367,11 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value - val footerFileMetaData = + lazy val footerFileMetaData = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val parquetSchema = footerFileMetaData.getSchema - - def checkDuplicateFields(parquetRecord: GroupType): Unit = { - val fields = parquetRecord.getFields.asScala - val fieldMap = fields.groupBy(_.getName.toLowerCase(Locale.ROOT)) - fieldMap.foreach { case (_, types) => - if (types.size > 1) { - // Need to fail if there is ambiguity, i.e. more than one field is duplicate - val typesString = types.map(_.getName).mkString("[", ", ", "]") - throw new RuntimeException(s"Found duplicate field(s):" + - s"$typesString in case-insensitive mode") - } - } - - fields.filter(!_.isPrimitive).foreach { groupField => - checkDuplicateFields(groupField.asGroupType()) - } - } - if (!isCaseSensitive) { - checkDuplicateFields(parquetSchema) - } - // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index a55abe5c46a4..3c6703767199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -364,16 +364,24 @@ private[parquet] class ParquetFilters( // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters // and it does not support to create filters for them. - val primitiveFieldMap = + val primitiveFields = dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => f.getName -> ParquetField(f.getName, ParquetSchemaType(f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) - }.toMap + } if (caseSensitive) { - primitiveFieldMap + primitiveFields.toMap } else { - CaseInsensitiveMap(primitiveFieldMap) + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. + val dedupPrimitiveFields = + primitiveFields + .groupBy(_._1.toLowerCase(Locale.ROOT)) + .filter(_._2.size == 1) + .mapValues(_.head._2) + CaseInsensitiveMap(dedupPrimitiveFields) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 0846432d6484..3319e73f2b31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -30,7 +30,6 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -293,14 +292,33 @@ private[parquet] object ParquetReadSupport { private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = { val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) - val fieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val finalParquetFieldMap = - if (caseSensitive) fieldMap else CaseInsensitiveMap(fieldMap) - structType.map { f => - finalParquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType, caseSensitive)) - .getOrElse(toParquet.convertField(f)) + if (caseSensitive) { + val caseSensitiveParquetFieldMap = + parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + structType.map { f => + caseSensitiveParquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType, caseSensitive)) + .getOrElse(toParquet.convertField(f)) + } + } else { + // Do case-insensitive resolution only if in case-insensitive mode + val caseInsensitiveParquetFieldMap = + parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT)) + structType.map { f => + caseInsensitiveParquetFieldMap + .get(f.name.toLowerCase(Locale.ROOT)) + .map { parquetTypes => + if (parquetTypes.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched + val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") + throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ + + s"$parquetTypesString in case-insensitive mode") + } else { + clipParquetType(parquetTypes.head, f.dataType, caseSensitive) + } + }.getOrElse(toParquet.convertField(f)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5e0a73891eae..27fede71685f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1106,7 +1106,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { - withTempPath { dir => + withTempDir { dir => val tableName = "spark_25207" val tableDir = dir.getAbsoluteFile + "/table" withTable(tableName) { From 04b88c579afb975fb0d4c8a8c4e5ff54f3c27ce2 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 10:33:19 +0800 Subject: [PATCH 14/16] address comments --- .../sql/execution/datasources/parquet/ParquetFilterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 27fede71685f..5e0a73891eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1106,7 +1106,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { - withTempDir { dir => + withTempPath { dir => val tableName = "spark_25207" val tableDir = dir.getAbsoluteFile + "/table" withTable(tableName) { From 41a7b8385972ef504af7f88954a9e4ccecec6885 Mon Sep 17 00:00:00 2001 From: yucai Date: Thu, 30 Aug 2018 14:14:52 +0800 Subject: [PATCH 15/16] improve test --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5e0a73891eae..7ebb75009555 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1107,11 +1107,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { withTempPath { dir => + val count = 10 val tableName = "spark_25207" val tableDir = dir.getAbsoluteFile + "/table" withTable(tableName) { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - spark.range(10).selectExpr("id as A", "id as B", "id as b") + spark.range(count).selectExpr("id as A", "id as B", "id as b") .write.mode("overwrite").parquet(tableDir) } sql( @@ -1126,6 +1127,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + checkAnswer(sql(s"select A from $tableName where B > 0"), (1 until count).map(Row(_))) + } } } } From e0d61969b13bcfd9dfc95e2a013b14e111d2b832 Mon Sep 17 00:00:00 2001 From: yucai Date: Fri, 31 Aug 2018 11:28:29 +0800 Subject: [PATCH 16/16] address comments --- .../execution/datasources/parquet/ParquetFilters.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 3c6703767199..0c286defb940 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -48,10 +48,14 @@ private[parquet] class ParquetFilters( pushDownInFilterThreshold: Int, caseSensitive: Boolean) { + /** + * Holds a single field information stored in the underlying parquet file. + * + * @param fieldName field name in parquet file + * @param fieldType field type related info in parquet file + */ private case class ParquetField( - // field name in parquet file fieldName: String, - // field type related info in parquet file fieldType: ParquetSchemaType) private case class ParquetSchemaType(