From 366bb35ad62edb8e6707e65c681a5b9001cc868e Mon Sep 17 00:00:00 2001 From: seancxmao Date: Fri, 17 Aug 2018 18:06:28 +0800 Subject: [PATCH 1/2] [SPARK-25175][SQL] Field resolution should fail if there's ambiguity for ORC native reader --- .../execution/datasources/orc/OrcUtils.scala | 29 +++++++- .../spark/sql/FileBasedDataSourceSuite.scala | 71 ++++++++++--------- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index b404cfa61f41e..5e13c66bb4104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.orc +import java.util.Locale + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -27,7 +29,7 @@ import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} +import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ @@ -115,8 +117,29 @@ object OrcUtils extends Logging { } }) } else { - val resolver = if (isCaseSensitive) caseSensitiveResolution else caseInsensitiveResolution - Some(requiredSchema.fieldNames.map { name => orcFieldNames.indexWhere(resolver(_, name)) }) + if (isCaseSensitive) { + Some(requiredSchema.fieldNames.map { name => + orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) + }) + } else { + // Do case-insensitive resolution only if in case-insensitive mode + val caseInsensitiveOrcFieldMap = + orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase(Locale.ROOT)) + Some(requiredSchema.fieldNames.map { requiredFieldName => + caseInsensitiveOrcFieldMap + .get(requiredFieldName.toLowerCase(Locale.ROOT)) + .map { matchedOrcFields => + if (matchedOrcFields.size > 1) { + // Need to fail if there is ambiguity, i.e. more than one field is matched. + val matchedOrcFieldsString = matchedOrcFields.map(_._1).mkString("[", ", ", "]") + throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """ + + s"$matchedOrcFieldsString in case-insensitive mode") + } else { + matchedOrcFields.head._2 + } + }.getOrElse(-1) + }) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 4aa6afd69620b..426bf2ea1be17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -431,44 +431,45 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } - test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") { - withTempDir { dir => - val format = "parquet" - val tableDir = dir.getCanonicalPath + s"/$format" - val tableName = s"spark_25132_${format}" - withTable(tableName) { - val end = 5 - val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - data.write.format(format).mode("overwrite").save(tableDir) - } - sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") - - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - checkAnswer(sql(s"select a from $tableName"), data.select("A")) - checkAnswer(sql(s"select A from $tableName"), data.select("A")) - - // RuntimeException is triggered at executor side, which is then wrapped as - // SparkException at driver side - val e1 = intercept[SparkException] { - sql(s"select b from $tableName").collect() + Seq("parquet", "orc").foreach { format => + test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" + withTable(tableName) { + val end = 5 + val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + data.write.format(format).mode("overwrite").save(tableDir) } - assert( - e1.getCause.isInstanceOf[RuntimeException] && - e1.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) - val e2 = intercept[SparkException] { - sql(s"select B from $tableName").collect() + sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkAnswer(sql(s"select a from $tableName"), data.select("A")) + checkAnswer(sql(s"select A from $tableName"), data.select("A")) + + // RuntimeException is triggered at executor side, which is then wrapped as + // SparkException at driver side + val e1 = intercept[SparkException] { + sql(s"select b from $tableName").collect() + } + assert( + e1.getCause.isInstanceOf[RuntimeException] && + e1.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + val e2 = intercept[SparkException] { + sql(s"select B from $tableName").collect() + } + assert( + e2.getCause.isInstanceOf[RuntimeException] && + e2.getCause.getMessage.contains( + """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) } - assert( - e2.getCause.isInstanceOf[RuntimeException] && - e2.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) - } - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null))) - checkAnswer(sql(s"select b from $tableName"), data.select("b")) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null))) + checkAnswer(sql(s"select b from $tableName"), data.select("b")) + } } } } From 26b4710bb47e700d3532da5652fd5ea3f128be20 Mon Sep 17 00:00:00 2001 From: seancxmao Date: Sat, 8 Sep 2018 13:18:15 +0800 Subject: [PATCH 2/2] Adjust indentation --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 5e13c66bb4104..a12098f78dd25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -137,7 +137,7 @@ object OrcUtils extends Logging { } else { matchedOrcFields.head._2 } - }.getOrElse(-1) + }.getOrElse(-1) }) } }