Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -612,7 +612,13 @@ class ParquetFilters(
value == null || (nameToParquetField(name).fieldType match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetIntegerType if value.isInstanceOf[Period] => true
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
// We don't create a filter if the value would overflow.
case _: JByte | _: JShort | _: Integer => true
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
case _ => false
}
case ParquetLongType => value.isInstanceOf[JLong] || value.isInstanceOf[Duration]
case ParquetFloatType => value.isInstanceOf[JFloat]
case ParquetDoubleType => value.isInstanceOf[JDouble]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -901,6 +902,76 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

test("don't push down filters that would result in overflows") {
val schema = StructType(Seq(
StructField("cbyte", ByteType),
StructField("cshort", ShortType),
StructField("cint", IntegerType)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)

for {
column <- Seq("cbyte", "cshort", "cint")
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE).map(JLong.valueOf)
} {
val filters = Seq(
sources.LessThan(column, value),
sources.LessThanOrEqual(column, value),
sources.GreaterThan(column, value),
sources.GreaterThanOrEqual(column, value),
sources.EqualTo(column, value),
sources.EqualNullSafe(column, value),
sources.Not(sources.EqualTo(column, value)),
sources.In(column, Array(value))
)
for (filter <- filters) {
assert(parquetFilters.createFilter(filter).isEmpty,
s"Row group filter $filter shouldn't be pushed down.")
}
}
}

test("don't push down filters when value type doesn't match column type") {
val schema = StructType(Seq(
StructField("cbyte", ByteType),
StructField("cshort", ShortType),
StructField("cint", IntegerType),
StructField("clong", LongType),
StructField("cfloat", FloatType),
StructField("cdouble", DoubleType),
StructField("cboolean", BooleanType),
StructField("cstring", StringType),
StructField("cdate", DateType),
StructField("ctimestamp", TimestampType),
StructField("cbinary", BinaryType),
StructField("cdecimal", DecimalType(10, 0))
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)

val filters = Seq(
sources.LessThan("cbyte", String.valueOf("1")),
sources.LessThan("cshort", JBigDecimal.valueOf(1)),
sources.LessThan("cint", JFloat.valueOf(JFloat.NaN)),
sources.LessThan("clong", String.valueOf("1")),
sources.LessThan("cfloat", JDouble.valueOf(1.0D)),
sources.LessThan("cdouble", JFloat.valueOf(1.0F)),
sources.LessThan("cboolean", String.valueOf("true")),
sources.LessThan("cstring", Integer.valueOf(1)),
sources.LessThan("cdate", Timestamp.valueOf("2018-01-01 00:00:00")),
sources.LessThan("ctimestamp", Date.valueOf("2018-01-01")),
sources.LessThan("cbinary", Integer.valueOf(1)),
sources.LessThan("cdecimal", Integer.valueOf(1234))
)
for (filter <- filters) {
assert(parquetFilters.createFilter(filter).isEmpty,
s"Row group filter $filter shouldn't be pushed down.")
}
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("row group skipping doesn't overflow when reading into larger type") {
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
// to test both as this covers row group skipping.
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
// of incorrectly skipping the single row group and producing incorrect results.
val exception = intercept[SparkException] {
spark.read
.schema("a LONG")
.parquet(path.toString)
.where(s"a < ${Long.MaxValue}")
.collect()
}
assert(exception.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
}
}
}

test("SPARK-36825, SPARK-36852: create table with ANSI intervals") {
withTable("tbl") {
sql("create table tbl (c1 interval day, c2 interval year to month) using parquet")
Expand Down