Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.orc

import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
Expand All @@ -27,7 +29,7 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -115,8 +117,29 @@ object OrcUtils extends Logging {
}
})
} else {
val resolver = if (isCaseSensitive) caseSensitiveResolution else caseInsensitiveResolution
Some(requiredSchema.fieldNames.map { name => orcFieldNames.indexWhere(resolver(_, name)) })
if (isCaseSensitive) {
Some(requiredSchema.fieldNames.map { name =>
orcFieldNames.indexWhere(caseSensitiveResolution(_, name))
})
} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveOrcFieldMap =
orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase(Locale.ROOT))
Some(requiredSchema.fieldNames.map { requiredFieldName =>
caseInsensitiveOrcFieldMap
.get(requiredFieldName.toLowerCase(Locale.ROOT))
.map { matchedOrcFields =>
if (matchedOrcFields.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched.
val matchedOrcFieldsString = matchedOrcFields.map(_._1).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """
+ s"$matchedOrcFieldsString in case-insensitive mode")
} else {
matchedOrcFields.head._2
}
}.getOrElse(-1)
})
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,44 +431,45 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") {
withTempDir { dir =>
val format = "parquet"
val tableDir = dir.getCanonicalPath + s"/$format"
val tableName = s"spark_25132_${format}"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
Seq("parquet", "orc").foreach { format =>
test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
}
}
}
}
Expand Down