Skip to content

Commit ac686f6

Browse files
committed
[SPARK-46092][SQL] Don't push down Parquet row group filters that overflow
### What changes were proposed in this pull request? This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")` While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing. ### Why are the changes needed? Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` will return an empty result. The correct result is either: - Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today) - Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability). ### Does this PR introduce _any_ user-facing change? The following: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader. ### How was this patch tested? - Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type - Added test to `ParquetQuerySuite` covering the correctness issue described above. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow. Authored-by: Johan Lasperas <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b8750d5 commit ac686f6

File tree

3 files changed

+99
-2
lines changed

3 files changed

+99
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
20+
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
2121
import java.math.{BigDecimal => JBigDecimal}
2222
import java.nio.charset.StandardCharsets.UTF_8
2323
import java.sql.{Date, Timestamp}
@@ -612,7 +612,13 @@ class ParquetFilters(
612612
value == null || (nameToParquetField(name).fieldType match {
613613
case ParquetBooleanType => value.isInstanceOf[JBoolean]
614614
case ParquetIntegerType if value.isInstanceOf[Period] => true
615-
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
615+
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
616+
// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
617+
// We don't create a filter if the value would overflow.
618+
case _: JByte | _: JShort | _: Integer => true
619+
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
620+
case _ => false
621+
}
616622
case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration]
617623
case ParquetFloatType => value.isInstanceOf[JFloat]
618624
case ParquetDoubleType => value.isInstanceOf[JDouble]

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.io.File
21+
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
2122
import java.math.{BigDecimal => JBigDecimal}
2223
import java.nio.charset.StandardCharsets
2324
import java.sql.{Date, Timestamp}
@@ -902,6 +903,76 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
902903
}
903904
}
904905

906+
test("don't push down filters that would result in overflows") {
907+
val schema = StructType(Seq(
908+
StructField("cbyte", ByteType),
909+
StructField("cshort", ShortType),
910+
StructField("cint", IntegerType)
911+
))
912+
913+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
914+
val parquetFilters = createParquetFilters(parquetSchema)
915+
916+
for {
917+
column <- Seq("cbyte", "cshort", "cint")
918+
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE).map(JLong.valueOf)
919+
} {
920+
val filters = Seq(
921+
sources.LessThan(column, value),
922+
sources.LessThanOrEqual(column, value),
923+
sources.GreaterThan(column, value),
924+
sources.GreaterThanOrEqual(column, value),
925+
sources.EqualTo(column, value),
926+
sources.EqualNullSafe(column, value),
927+
sources.Not(sources.EqualTo(column, value)),
928+
sources.In(column, Array(value))
929+
)
930+
for (filter <- filters) {
931+
assert(parquetFilters.createFilter(filter).isEmpty,
932+
s"Row group filter $filter shouldn't be pushed down.")
933+
}
934+
}
935+
}
936+
937+
test("don't push down filters when value type doesn't match column type") {
938+
val schema = StructType(Seq(
939+
StructField("cbyte", ByteType),
940+
StructField("cshort", ShortType),
941+
StructField("cint", IntegerType),
942+
StructField("clong", LongType),
943+
StructField("cfloat", FloatType),
944+
StructField("cdouble", DoubleType),
945+
StructField("cboolean", BooleanType),
946+
StructField("cstring", StringType),
947+
StructField("cdate", DateType),
948+
StructField("ctimestamp", TimestampType),
949+
StructField("cbinary", BinaryType),
950+
StructField("cdecimal", DecimalType(10, 0))
951+
))
952+
953+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
954+
val parquetFilters = createParquetFilters(parquetSchema)
955+
956+
val filters = Seq(
957+
sources.LessThan("cbyte", String.valueOf("1")),
958+
sources.LessThan("cshort", JBigDecimal.valueOf(1)),
959+
sources.LessThan("cint", JFloat.valueOf(JFloat.NaN)),
960+
sources.LessThan("clong", String.valueOf("1")),
961+
sources.LessThan("cfloat", JDouble.valueOf(1.0D)),
962+
sources.LessThan("cdouble", JFloat.valueOf(1.0F)),
963+
sources.LessThan("cboolean", String.valueOf("true")),
964+
sources.LessThan("cstring", Integer.valueOf(1)),
965+
sources.LessThan("cdate", Timestamp.valueOf("2018-01-01 00:00:00")),
966+
sources.LessThan("ctimestamp", Date.valueOf("2018-01-01")),
967+
sources.LessThan("cbinary", Integer.valueOf(1)),
968+
sources.LessThan("cdecimal", Integer.valueOf(1234))
969+
)
970+
for (filter <- filters) {
971+
assert(parquetFilters.createFilter(filter).isEmpty,
972+
s"Row group filter $filter shouldn't be pushed down.")
973+
}
974+
}
975+
905976
test("SPARK-6554: don't push down predicates which reference partition columns") {
906977
import testImplicits._
907978

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
10951095
}
10961096
}
10971097

1098+
test("row group skipping doesn't overflow when reading into larger type") {
1099+
withTempPath { path =>
1100+
Seq(0).toDF("a").write.parquet(path.toString)
1101+
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
1102+
// to test both as this covers row group skipping.
1103+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
1104+
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
1105+
// of incorrectly skipping the single row group and producing incorrect results.
1106+
val exception = intercept[SparkException] {
1107+
spark.read
1108+
.schema("a LONG")
1109+
.parquet(path.toString)
1110+
.where(s"a < ${Long.MaxValue}")
1111+
.collect()
1112+
}
1113+
assert(exception.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
1114+
}
1115+
}
1116+
}
1117+
10981118
test("SPARK-36825, SPARK-36852: create table with ANSI intervals") {
10991119
withTable("tbl") {
11001120
sql("create table tbl (c1 interval day, c2 interval year to month) using parquet")

0 commit comments

Comments
 (0)