Skip to content

Conversation

@johanl-db
Copy link
Contributor

What changes were proposed in this pull request?

This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via .schema("col LONG")
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

Why are the changes needed?

Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:

Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()

will return an empty result. The correct result is either:

  • Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
  • Return result [0] if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

Does this PR introduce any user-facing change?

The following:

Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()

produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

How was this patch tested?

  • Added tests to ParquetFilterSuite to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
  • Added test to ParquetQuerySuite covering the correctness issue described above.

Was this patch authored or co-authored using generative AI tooling?

No

@johanl-db johanl-db force-pushed the SPARK-46092-row-group-skipping-overflow branch from ccb9e73 to a392ef5 Compare November 24, 2023 15:25
@github-actions github-actions bot added the SQL label Nov 24, 2023
@HyukjinKwon HyukjinKwon changed the title [SPARK-46092] Don't push down Parquet row group filters that overflow [SPARK-46092][SQL] Don't push down Parquet row group filters that overflow Nov 27, 2023
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetIntegerType if value.isInstanceOf[Period] => true
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add some comments like there are only INT32 and INT64 physical types in Parquet format specification; therefore, we don't check INT8 and/or INT16.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could even allow exact match of value on Int to be conservative? cc @cloud-fan and @wangyum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. I also made the condition more restrictive: value must be either a java Byte/Short/Integer/Long. This excludes other Numbers such as Float/Double/BigDecimal that are generally not safe to cast to int and that the initial change didn't block. For example, float NaN gets cast to 0 and would pass the previous check.

We can't match on Integer because for ParquetByteType and ParquetShortType the value will typically be a Byte and Short resp. Also this allows upcasting where a parquet integer type is read with a larger Spark integer type.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @johanl-db . Do you happen to know what causes this? I'm curious if this is Apache Spark 3.5.0-only issue or not.

Screenshot 2023-11-28 at 9 30 59 AM

cc @sunchao

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that the check for Spark type versus Parquet type happens in ParquetVectorUpdaterFactory which is after predicate pushdown for row groups. Will similar issue happen for float to double in certain cases?

@johanl-db
Copy link
Contributor Author

johanl-db commented Nov 29, 2023

It's unfortunate that the check for Spark type versus Parquet type happens in ParquetVectorUpdaterFactory which is after predicate pushdown for row groups. Will similar issue happen for float to double in certain cases?

There's no issue with float to double because we were already strict when deciding whether to build a row group filter: we only accept float values for float and double values for double so no overflow possible.

Hi, @johanl-db . Do you happen to know what causes this? I'm curious if this is Apache Spark 3.5.0-only issue or not.

When creating row group filters, we accept any value and don't check if the value actually fits in the target type. If the read schema is LONG for example and the parquet type is INT32, you could pass a value that will overflow before this change. We have stricter type checks in the Parquet readers themselves, but by that time it's too late as the row group may already be incorrectly skipped and that check won't trigger.

Looking at https://github.com/apache/spark/blame/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L478, this goes back at least to 3.0 and it seems the check was even less strict in earlier versions so I'd say this behavior was always there.

// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
// We don't create a filter if the value would overflow.
case _: JByte | _: JShort | _: Integer => true
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simply, how about forbid the Long value direclty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some use cases where this can be useful, assuming the Parquet file contains type INT32:

  • The user specifies a read schema using schema("col LONG").
  • The column in the table schema has type LONG.
    I don't know whether the latter can happen today but it will be possible in the near future for Delta tables as I'm looking into supporting type widening.

We could skip creating row group filters in that case but the logic is simple enough and it's going to be beneficial in the cases above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation. I got the two use case.
But users may not notice the detail and confused by the behavior. I think we can delay the support until Delta table supports filter on long value.
Of course, skip creating row group filters if value exceeds the long range looks good too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan would be to support this in the Delta version that will build on top of the next Spark version, so it would be good to have it here already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Spark already supports long type for a few releases, we can't drop it now or we have perf regressions. I'm +1 for the change here.

@johanl-db johanl-db requested a review from beliefer December 1, 2023 13:51
@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0.

Could you make backporting PRs to make it sure that all tests pass in the release branches, @johanl-db ?

Thank you, @johanl-db , @wangyum , @HyukjinKwon , @sunchao , @beliefer , @cloud-fan .

johanl-db added a commit to johanl-db/spark that referenced this pull request Dec 4, 2023
…rflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
johanl-db added a commit to johanl-db/spark that referenced this pull request Dec 4, 2023
…rflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
johanl-db added a commit to johanl-db/spark that referenced this pull request Dec 4, 2023
…rflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
johanl-db added a commit to johanl-db/spark that referenced this pull request Dec 4, 2023
…rflow

This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@johanl-db
Copy link
Contributor Author

johanl-db commented Dec 4, 2023

@dongjoon-hyun I created backport PRs for the following branches:

Any other branch I should target?

dongjoon-hyun pushed a commit that referenced this pull request Dec 4, 2023
…t overflow

This is a cherry-pick from #44006 to spark 3.5

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44154 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.5.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Dec 4, 2023
…t overflow

This is a cherry-pick from #44006 to spark 3.4

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44155 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.4.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Dec 4, 2023
…t overflow

This is a cherry-pick from #44006 to spark 3.3

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44156 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.3.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

Thank you, @johanl-db . Those branches are enough and all PRs are merged now.

asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
…rflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
…rflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…t overflow

This is a cherry-pick from apache#44006 to spark 3.4

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44155 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.4.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…t overflow (apache#361)

This is a cherry-pick from apache#44006 to spark 3.5

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44154 from johanl-db/SPARK-46092-row-group-skipping-overflow-3.5.

Authored-by: Johan Lasperas <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Johan Lasperas <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants