Skip to content

Commit 3469f5c

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase with dictionary filters.
## What changes were proposed in this pull request? I missed this commit when preparing #21070. When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering. ## How was this patch tested? Using in production at Netflix. Added test case for dictionary-filtered blocks. Author: Ryan Blue <[email protected]> Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.
1 parent 4a14dc0 commit 3469f5c

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
146146
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
147147
this.reader = new ParquetFileReader(
148148
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
149-
for (BlockMetaData block : blocks) {
149+
// use the blocks from the reader in case some do not match filters and will not be read
150+
for (BlockMetaData block : reader.getRowGroups()) {
150151
this.totalRowCount += block.getRowCount();
151152
}
152153

@@ -224,7 +225,8 @@ protected void initialize(String path, List<String> columns) throws IOException
224225
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
225226
this.reader = new ParquetFileReader(
226227
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
227-
for (BlockMetaData block : blocks) {
228+
// use the blocks from the reader in case some do not match filters and will not be read
229+
for (BlockMetaData block : reader.getRowGroups()) {
228230
this.totalRowCount += block.getRowCount();
229231
}
230232
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
879879
}
880880
}
881881
}
882+
883+
test("SPARK-24230: filter row group using dictionary") {
884+
withSQLConf(("parquet.filter.dictionary.enabled", "true")) {
885+
// create a table with values from 0, 2, ..., 18 that will be dictionary-encoded
886+
withParquetTable((0 until 100).map(i => ((i * 2) % 20, s"data-$i")), "t") {
887+
// search for a key that is not present so the dictionary filter eliminates all row groups
888+
// Fails without SPARK-24230:
889+
// java.io.IOException: expecting more rows but reached last block. Read 0 out of 50
890+
checkAnswer(sql("SELECT _2 FROM t WHERE t._1 = 5"), Seq.empty)
891+
}
892+
}
893+
}
882894
}
883895

884896
object TestingUDT {

0 commit comments

Comments
 (0)