Skip to content

Commit aa0b0b8

Browse files
committed
Revert "[SPARK-32646][SQL] ORC predicate pushdown should work with case-insensitive analysis"
### What changes were proposed in this pull request? This reverts commit e277ef1. ### Why are the changes needed? Because master and branch-3.0 both have few tests failed under hive-1.2 profile. And the PR #29457 missed a change in hive-1.2 code that causes compilation error. So it will make debugging the failed tests harder. I'd like revert #29457 first to unblock it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #29519 from viirya/revert-SPARK-32646. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent f258718 commit aa0b0b8

File tree

10 files changed

+60
-243
lines changed

10 files changed

+60
-243
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ class OrcFileFormat
153153
filters: Seq[Filter],
154154
options: Map[String, String],
155155
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
156+
if (sparkSession.sessionState.conf.orcFilterPushDown) {
157+
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
158+
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
159+
}
160+
}
156161

157162
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
158163
val sqlConf = sparkSession.sessionState.conf
@@ -164,8 +169,6 @@ class OrcFileFormat
164169
val broadcastedConf =
165170
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
166171
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
167-
val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
168-
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
169172

170173
(file: PartitionedFile) => {
171174
val conf = broadcastedConf.value.value
@@ -183,15 +186,6 @@ class OrcFileFormat
183186
if (resultedColPruneInfo.isEmpty) {
184187
Iterator.empty
185188
} else {
186-
// ORC predicate pushdown
187-
if (orcFilterPushDown) {
188-
OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).map { fileSchema =>
189-
OrcFilters.createFilter(fileSchema, filters).foreach { f =>
190-
OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames)
191-
}
192-
}
193-
}
194-
195189
val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
196190
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
197191
dataSchema, resultSchema, partitionSchema, conf)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ trait OrcFiltersBase {
3939
}
4040
}
4141

42-
case class OrcPrimitiveField(fieldName: String, fieldType: DataType)
43-
4442
/**
4543
* This method returns a map which contains ORC field name and data type. Each key
4644
* represents a column; `dots` are used as separators for nested columns. If any part
@@ -51,21 +49,19 @@ trait OrcFiltersBase {
5149
*/
5250
protected[sql] def getSearchableTypeMap(
5351
schema: StructType,
54-
caseSensitive: Boolean): Map[String, OrcPrimitiveField] = {
52+
caseSensitive: Boolean): Map[String, DataType] = {
5553
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
5654

5755
def getPrimitiveFields(
5856
fields: Seq[StructField],
59-
parentFieldNames: Seq[String] = Seq.empty): Seq[(String, OrcPrimitiveField)] = {
57+
parentFieldNames: Seq[String] = Seq.empty): Seq[(String, DataType)] = {
6058
fields.flatMap { f =>
6159
f.dataType match {
6260
case st: StructType =>
6361
getPrimitiveFields(st.fields, parentFieldNames :+ f.name)
6462
case BinaryType => None
6563
case _: AtomicType =>
66-
val fieldName = (parentFieldNames :+ f.name).quoted
67-
val orcField = OrcPrimitiveField(fieldName, f.dataType)
68-
Some((fieldName, orcField))
64+
Some(((parentFieldNames :+ f.name).quoted, f.dataType))
6965
case _ => None
7066
}
7167
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,6 @@ object OrcUtils extends Logging {
9292
}
9393
}
9494

95-
def readCatalystSchema(
96-
file: Path,
97-
conf: Configuration,
98-
ignoreCorruptFiles: Boolean): Option[StructType] = {
99-
readSchema(file, conf, ignoreCorruptFiles) match {
100-
case Some(schema) =>
101-
Some(CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType])
102-
103-
case None =>
104-
// Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true.
105-
None
106-
}
107-
}
108-
10995
/**
11096
* Reads ORC file schemas in multi-threaded manner, using native version of ORC.
11197
* This is visible for testing.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ import org.apache.spark.broadcast.Broadcast
3131
import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
3333
import org.apache.spark.sql.execution.datasources.PartitionedFile
34-
import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils}
34+
import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcUtils}
3535
import org.apache.spark.sql.execution.datasources.v2._
3636
import org.apache.spark.sql.internal.SQLConf
37-
import org.apache.spark.sql.sources.Filter
3837
import org.apache.spark.sql.types.{AtomicType, StructType}
3938
import org.apache.spark.sql.vectorized.ColumnarBatch
4039
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -53,39 +52,24 @@ case class OrcPartitionReaderFactory(
5352
broadcastedConf: Broadcast[SerializableConfiguration],
5453
dataSchema: StructType,
5554
readDataSchema: StructType,
56-
partitionSchema: StructType,
57-
filters: Array[Filter]) extends FilePartitionReaderFactory {
55+
partitionSchema: StructType) extends FilePartitionReaderFactory {
5856
private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields)
5957
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
6058
private val capacity = sqlConf.orcVectorizedReaderBatchSize
61-
private val orcFilterPushDown = sqlConf.orcFilterPushDown
62-
private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
6359

6460
override def supportColumnarReads(partition: InputPartition): Boolean = {
6561
sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
6662
resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
6763
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
6864
}
6965

70-
private def pushDownPredicates(filePath: Path, conf: Configuration): Unit = {
71-
if (orcFilterPushDown) {
72-
OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).map { fileSchema =>
73-
OrcFilters.createFilter(fileSchema, filters).foreach { f =>
74-
OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames)
75-
}
76-
}
77-
}
78-
}
79-
8066
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
8167
val conf = broadcastedConf.value.value
8268

8369
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
8470

8571
val filePath = new Path(new URI(file.filePath))
8672

87-
pushDownPredicates(filePath, conf)
88-
8973
val fs = filePath.getFileSystem(conf)
9074
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
9175
val resultedColPruneInfo =
@@ -132,8 +116,6 @@ case class OrcPartitionReaderFactory(
132116

133117
val filePath = new Path(new URI(file.filePath))
134118

135-
pushDownPredicates(filePath, conf)
136-
137119
val fs = filePath.getFileSystem(conf)
138120
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
139121
val resultedColPruneInfo =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ case class OrcScan(
4848
// The partition values are already truncated in `FileScan.partitions`.
4949
// We should use `readPartitionSchema` as the partition schema here.
5050
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
51-
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
51+
dataSchema, readDataSchema, readPartitionSchema)
5252
}
5353

5454
override def equals(obj: Any): Boolean = obj match {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ case class OrcScanBuilder(
5656

5757
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
5858
if (sparkSession.sessionState.conf.orcFilterPushDown) {
59+
OrcFilters.createFilter(schema, filters).foreach { f =>
60+
// The pushed filters will be set in `hadoopConf`. After that, we can simply use the
61+
// changed `hadoopConf` in executors.
62+
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
63+
}
5964
val dataTypeMap = OrcFilters.getSearchableTypeMap(schema, SQLConf.get.caseSensitiveAnalysis)
6065
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
6166
}

sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
8181

8282
def convertibleFilters(
8383
schema: StructType,
84-
dataTypeMap: Map[String, OrcPrimitiveField],
84+
dataTypeMap: Map[String, DataType],
8585
filters: Seq[Filter]): Seq[Filter] = {
8686
import org.apache.spark.sql.sources._
8787

@@ -179,7 +179,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
179179
* @return the builder so far.
180180
*/
181181
private def buildSearchArgument(
182-
dataTypeMap: Map[String, OrcPrimitiveField],
182+
dataTypeMap: Map[String, DataType],
183183
expression: Filter,
184184
builder: Builder): Builder = {
185185
import org.apache.spark.sql.sources._
@@ -215,7 +215,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
215215
* @return the builder so far.
216216
*/
217217
private def buildLeafSearchArgument(
218-
dataTypeMap: Map[String, OrcPrimitiveField],
218+
dataTypeMap: Map[String, DataType],
219219
expression: Filter,
220220
builder: Builder): Option[Builder] = {
221221
def getType(attribute: String): PredicateLeaf.Type =
@@ -228,44 +228,38 @@ private[sql] object OrcFilters extends OrcFiltersBase {
228228
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
229229
expression match {
230230
case EqualTo(name, value) if dataTypeMap.contains(name) =>
231-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
232-
Some(builder.startAnd()
233-
.equals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
231+
val castedValue = castLiteralValue(value, dataTypeMap(name))
232+
Some(builder.startAnd().equals(name, getType(name), castedValue).end())
234233

235234
case EqualNullSafe(name, value) if dataTypeMap.contains(name) =>
236-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
237-
Some(builder.startAnd()
238-
.nullSafeEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
235+
val castedValue = castLiteralValue(value, dataTypeMap(name))
236+
Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end())
239237

240238
case LessThan(name, value) if dataTypeMap.contains(name) =>
241-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
242-
Some(builder.startAnd()
243-
.lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end())
239+
val castedValue = castLiteralValue(value, dataTypeMap(name))
240+
Some(builder.startAnd().lessThan(name, getType(name), castedValue).end())
244241

245242
case LessThanOrEqual(name, value) if dataTypeMap.contains(name) =>
246-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
247-
Some(builder.startAnd()
248-
.lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
243+
val castedValue = castLiteralValue(value, dataTypeMap(name))
244+
Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end())
249245

250246
case GreaterThan(name, value) if dataTypeMap.contains(name) =>
251-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
252-
Some(builder.startNot()
253-
.lessThanEquals(dataTypeMap(name).fieldName, getType(name), castedValue).end())
247+
val castedValue = castLiteralValue(value, dataTypeMap(name))
248+
Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end())
254249

255250
case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) =>
256-
val castedValue = castLiteralValue(value, dataTypeMap(name).fieldType)
257-
Some(builder.startNot()
258-
.lessThan(dataTypeMap(name).fieldName, getType(name), castedValue).end())
251+
val castedValue = castLiteralValue(value, dataTypeMap(name))
252+
Some(builder.startNot().lessThan(name, getType(name), castedValue).end())
259253

260254
case IsNull(name) if dataTypeMap.contains(name) =>
261-
Some(builder.startAnd().isNull(dataTypeMap(name).fieldName, getType(name)).end())
255+
Some(builder.startAnd().isNull(name, getType(name)).end())
262256

263257
case IsNotNull(name) if dataTypeMap.contains(name) =>
264-
Some(builder.startNot().isNull(dataTypeMap(name).fieldName, getType(name)).end())
258+
Some(builder.startNot().isNull(name, getType(name)).end())
265259

266260
case In(name, values) if dataTypeMap.contains(name) =>
267-
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name).fieldType))
268-
Some(builder.startAnd().in(dataTypeMap(name).fieldName, getType(name),
261+
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name)))
262+
Some(builder.startAnd().in(name, getType(name),
269263
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
270264

271265
case _ => None

sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.sql.{Date, Timestamp}
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
27-
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
2827

2928
import org.apache.spark.{SparkConf, SparkException}
3029
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
@@ -587,7 +586,8 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
587586
checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))
588587

589588
val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
590-
assert(actual.count() == 1)
589+
// TODO: ORC predicate pushdown should work under case-insensitive analysis.
590+
// assert(actual.count() == 1)
591591
}
592592
}
593593

@@ -606,71 +606,5 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
606606
}
607607
}
608608
}
609-
610-
test("SPARK-32646: Case-insensitive field resolution for pushdown when reading ORC") {
611-
import org.apache.spark.sql.sources._
612-
613-
def getOrcFilter(
614-
schema: StructType,
615-
filters: Seq[Filter],
616-
caseSensitive: String): Option[SearchArgument] = {
617-
var orcFilter: Option[SearchArgument] = None
618-
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
619-
orcFilter =
620-
OrcFilters.createFilter(schema, filters)
621-
}
622-
orcFilter
623-
}
624-
625-
def testFilter(
626-
schema: StructType,
627-
filters: Seq[Filter],
628-
expected: SearchArgument): Unit = {
629-
val caseSensitiveFilters = getOrcFilter(schema, filters, "true")
630-
val caseInsensitiveFilters = getOrcFilter(schema, filters, "false")
631-
632-
assert(caseSensitiveFilters.isEmpty)
633-
assert(caseInsensitiveFilters.isDefined)
634-
635-
assert(caseInsensitiveFilters.get.getLeaves().size() > 0)
636-
assert(caseInsensitiveFilters.get.getLeaves().size() == expected.getLeaves().size())
637-
(0 until expected.getLeaves().size()).foreach { index =>
638-
assert(caseInsensitiveFilters.get.getLeaves().get(index) == expected.getLeaves().get(index))
639-
}
640-
}
641-
642-
val schema1 = StructType(Seq(StructField("cint", IntegerType)))
643-
testFilter(schema1, Seq(GreaterThan("CINT", 1)),
644-
newBuilder.startNot()
645-
.lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
646-
testFilter(schema1, Seq(
647-
And(GreaterThan("CINT", 1), EqualTo("Cint", 2))),
648-
newBuilder.startAnd()
649-
.startNot()
650-
.lessThanEquals("cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`()
651-
.equals("cint", OrcFilters.getPredicateLeafType(IntegerType), 2L)
652-
.`end`().build())
653-
654-
// Nested column case
655-
val schema2 = StructType(Seq(StructField("a",
656-
StructType(Seq(StructField("cint", IntegerType))))))
657-
658-
testFilter(schema2, Seq(GreaterThan("A.CINT", 1)),
659-
newBuilder.startNot()
660-
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
661-
testFilter(schema2, Seq(GreaterThan("a.CINT", 1)),
662-
newBuilder.startNot()
663-
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
664-
testFilter(schema2, Seq(GreaterThan("A.cint", 1)),
665-
newBuilder.startNot()
666-
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`().build())
667-
testFilter(schema2, Seq(
668-
And(GreaterThan("a.CINT", 1), EqualTo("a.Cint", 2))),
669-
newBuilder.startAnd()
670-
.startNot()
671-
.lessThanEquals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 1L).`end`()
672-
.equals("a.cint", OrcFilters.getPredicateLeafType(IntegerType), 2L)
673-
.`end`().build())
674-
}
675609
}
676610

0 commit comments

Comments
 (0)