Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -613,7 +613,13 @@ class ParquetFilters(
value == null || (nameToParquetField(name).fieldType match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetIntegerType if value.isInstanceOf[Period] => true
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add some comments like there are only INT32 and INT64 physical types in Parquet format specification; therefore, we don't check INT8 and/or INT16.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could even allow exact match of value on Int to be conservative? cc @cloud-fan and @wangyum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. I also made the condition more restrictive: value must be either a java Byte/Short/Integer/Long. This excludes other Numbers such as Float/Double/BigDecimal that are generally not safe to cast to int and that the initial change didn't block. For example, float NaN gets cast to 0 and would pass the previous check.

We can't match on Integer because for ParquetByteType and ParquetShortType the value will typically be a Byte and Short resp. Also this allows upcasting where a parquet integer type is read with a larger Spark integer type.

// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
// We don't create a filter if the value would overflow.
case _: JByte | _: JShort | _: Integer => true
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simply, how about forbid the Long value direclty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some use cases where this can be useful, assuming the Parquet file contains type INT32:

  • The user specifies a read schema using schema("col LONG").
  • The column in the table schema has type LONG.
    I don't know whether the latter can happen today but it will be possible in the near future for Delta tables as I'm looking into supporting type widening.

We could skip creating row group filters in that case but the logic is simple enough and it's going to be beneficial in the cases above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation. I got the two use case.
But users may not notice the detail and confused by the behavior. I think we can delay the support until Delta table supports filter on long value.
Of course, skip creating row group filters if value exceeds the long range looks good too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan would be to support this in the Delta version that will build on top of the next Spark version, so it would be good to have it here already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Spark already supports long type for a few releases, we can't drop it now or we have perf regressions. I'm +1 for the change here.

case _ => false
}
case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration]
case ParquetFloatType => value.isInstanceOf[JFloat]
case ParquetDoubleType => value.isInstanceOf[JDouble]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -906,6 +907,76 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

test("don't push down filters that would result in overflows") {
val schema = StructType(Seq(
StructField("cbyte", ByteType),
StructField("cshort", ShortType),
StructField("cint", IntegerType)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)

for {
column <- Seq("cbyte", "cshort", "cint")
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE).map(JLong.valueOf)
} {
val filters = Seq(
sources.LessThan(column, value),
sources.LessThanOrEqual(column, value),
sources.GreaterThan(column, value),
sources.GreaterThanOrEqual(column, value),
sources.EqualTo(column, value),
sources.EqualNullSafe(column, value),
sources.Not(sources.EqualTo(column, value)),
sources.In(column, Array(value))
)
for (filter <- filters) {
assert(parquetFilters.createFilter(filter).isEmpty,
s"Row group filter $filter shouldn't be pushed down.")
}
}
}

test("don't push down filters when value type doesn't match column type") {
val schema = StructType(Seq(
StructField("cbyte", ByteType),
StructField("cshort", ShortType),
StructField("cint", IntegerType),
StructField("clong", LongType),
StructField("cfloat", FloatType),
StructField("cdouble", DoubleType),
StructField("cboolean", BooleanType),
StructField("cstring", StringType),
StructField("cdate", DateType),
StructField("ctimestamp", TimestampType),
StructField("cbinary", BinaryType),
StructField("cdecimal", DecimalType(10, 0))
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)

val filters = Seq(
sources.LessThan("cbyte", String.valueOf("1")),
sources.LessThan("cshort", JBigDecimal.valueOf(1)),
sources.LessThan("cint", JFloat.valueOf(JFloat.NaN)),
sources.LessThan("clong", String.valueOf("1")),
sources.LessThan("cfloat", JDouble.valueOf(1.0D)),
sources.LessThan("cdouble", JFloat.valueOf(1.0F)),
sources.LessThan("cboolean", String.valueOf("true")),
sources.LessThan("cstring", Integer.valueOf(1)),
sources.LessThan("cdate", Timestamp.valueOf("2018-01-01 00:00:00")),
sources.LessThan("ctimestamp", Date.valueOf("2018-01-01")),
sources.LessThan("cbinary", Integer.valueOf(1)),
sources.LessThan("cdecimal", Integer.valueOf(1234))
)
for (filter <- filters) {
assert(parquetFilters.createFilter(filter).isEmpty,
s"Row group filter $filter shouldn't be pushed down.")
}
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("row group skipping doesn't overflow when reading into larger type") {
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
// to test both as this covers row group skipping.
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
// of incorrectly skipping the single row group and producing incorrect results.
val exception = intercept[SparkException] {
spark.read
.schema("a LONG")
.parquet(path.toString)
.where(s"a < ${Long.MaxValue}")
.collect()
}
assert(exception.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
}
}
}

test("SPARK-36825, SPARK-36852: create table with ANSI intervals") {
withTable("tbl") {
sql("create table tbl (c1 interval day, c2 interval year to month) using parquet")
Expand Down