Skip to content

Commit 4d23cae

Browse files
authored
Add metrics for parquet page level skipping (#4105)
* Add statistics for parquet page level skipping Signed-off-by: yangjiang <[email protected]> * use page row limit fix ut Signed-off-by: yangjiang <[email protected]> * fix. Signed-off-by: yangjiang <[email protected]> * remove todo Signed-off-by: yangjiang <[email protected]> Signed-off-by: yangjiang <[email protected]>
1 parent b7a3331 commit 4d23cae

File tree

4 files changed

+115
-29
lines changed

4 files changed

+115
-29
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -532,25 +532,43 @@ pub(crate) mod test_util {
532532

533533
pub async fn store_parquet(
534534
batches: Vec<RecordBatch>,
535+
multi_page: bool,
535536
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
536-
let files: Vec<_> = batches
537-
.into_iter()
538-
.map(|batch| {
539-
let mut output = NamedTempFile::new().expect("creating temp file");
540-
541-
let props = WriterProperties::builder().build();
542-
let mut writer =
543-
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
544-
.expect("creating writer");
545-
546-
writer.write(&batch).expect("Writing batch");
547-
writer.close().unwrap();
548-
output
549-
})
550-
.collect();
551-
552-
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
553-
Ok((meta, files))
537+
if multi_page {
538+
// All batches write in to one file, each batch must have same schema.
539+
let mut output = NamedTempFile::new().expect("creating temp file");
540+
let mut builder = WriterProperties::builder();
541+
builder = builder.set_data_page_row_count_limit(2);
542+
let proper = builder.build();
543+
let mut writer =
544+
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
545+
.expect("creating writer");
546+
for b in batches {
547+
writer.write(&b).expect("Writing batch");
548+
}
549+
writer.close().unwrap();
550+
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
551+
} else {
552+
// Each batch writes to their own file
553+
let files: Vec<_> = batches
554+
.into_iter()
555+
.map(|batch| {
556+
let mut output = NamedTempFile::new().expect("creating temp file");
557+
558+
let props = WriterProperties::builder().build();
559+
let mut writer =
560+
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
561+
.expect("creating writer");
562+
563+
writer.write(&batch).expect("Writing batch");
564+
writer.close().unwrap();
565+
output
566+
})
567+
.collect();
568+
569+
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
570+
Ok((meta, files))
571+
}
554572
}
555573
}
556574

@@ -599,7 +617,7 @@ mod tests {
599617
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
600618

601619
let store = Arc::new(LocalFileSystem::new()) as _;
602-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
620+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
603621

604622
let format = ParquetFormat::default();
605623
let schema = format.infer_schema(&store, &meta).await.unwrap();
@@ -738,7 +756,7 @@ mod tests {
738756
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
739757
LocalFileSystem::new(),
740758
)));
741-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
759+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
742760

743761
// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
744762
// for the remaining metadata

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ impl FileOpener for ParquetOpener {
445445
// page index pruning: if all data on individual pages can
446446
// be ruled using page metadata, rows from other columns
447447
// with that range can be skipped as well
448-
if let Some(row_selection) = enable_page_index
448+
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
449449
.then(|| {
450450
page_filter::build_page_filter(
451451
pruning_predicate.as_ref(),
@@ -914,7 +914,7 @@ mod tests {
914914
datasource::file_format::{parquet::ParquetFormat, FileFormat},
915915
physical_plan::collect,
916916
};
917-
use arrow::array::Float32Array;
917+
use arrow::array::{Float32Array, Int32Array};
918918
use arrow::datatypes::DataType::Decimal128;
919919
use arrow::record_batch::RecordBatch;
920920
use arrow::{
@@ -955,9 +955,16 @@ mod tests {
955955
predicate: Option<Expr>,
956956
pushdown_predicate: bool,
957957
) -> Result<Vec<RecordBatch>> {
958-
round_trip(batches, projection, schema, predicate, pushdown_predicate)
959-
.await
960-
.batches
958+
round_trip(
959+
batches,
960+
projection,
961+
schema,
962+
predicate,
963+
pushdown_predicate,
964+
false,
965+
)
966+
.await
967+
.batches
961968
}
962969

963970
/// Writes each RecordBatch as an individual parquet file and then
@@ -969,6 +976,7 @@ mod tests {
969976
schema: Option<SchemaRef>,
970977
predicate: Option<Expr>,
971978
pushdown_predicate: bool,
979+
page_index_predicate: bool,
972980
) -> RoundTripResult {
973981
let file_schema = match schema {
974982
Some(schema) => schema,
@@ -978,7 +986,7 @@ mod tests {
978986
),
979987
};
980988

981-
let (meta, _files) = store_parquet(batches).await.unwrap();
989+
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
982990
let file_groups = meta.into_iter().map(Into::into).collect();
983991

984992
// prepare the scan
@@ -1003,6 +1011,10 @@ mod tests {
10031011
.with_reorder_filters(true);
10041012
}
10051013

1014+
if page_index_predicate {
1015+
parquet_exec = parquet_exec.with_enable_page_index(true);
1016+
}
1017+
10061018
let session_ctx = SessionContext::new();
10071019
let task_ctx = session_ctx.task_ctx();
10081020
let parquet_exec = Arc::new(parquet_exec);
@@ -1220,7 +1232,8 @@ mod tests {
12201232
let filter = col("c2").eq(lit(2_i64));
12211233

12221234
// read/write them files:
1223-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1235+
let rt =
1236+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
12241237
let expected = vec![
12251238
"+----+----+----+",
12261239
"| c1 | c3 | c2 |",
@@ -1369,7 +1382,8 @@ mod tests {
13691382
let filter = col("c2").eq(lit(1_i64));
13701383

13711384
// read/write them files:
1372-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1385+
let rt =
1386+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
13731387

13741388
let expected = vec![
13751389
"+----+----+",
@@ -1690,6 +1704,33 @@ mod tests {
16901704
Ok(())
16911705
}
16921706

1707+
#[tokio::test]
1708+
async fn parquet_page_index_exec_metrics() {
1709+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
1710+
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
1711+
let batch1 = create_batch(vec![("int", c1.clone())]);
1712+
let batch2 = create_batch(vec![("int", c2.clone())]);
1713+
1714+
let filter = col("int").eq(lit(4_i32));
1715+
1716+
let rt =
1717+
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;
1718+
1719+
let metrics = rt.parquet_exec.metrics().unwrap();
1720+
1721+
// assert the batches and some metrics
1722+
let expected = vec![
1723+
"+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
1724+
];
1725+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1726+
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
1727+
assert!(
1728+
get_value(&metrics, "page_index_eval_time") > 0,
1729+
"no eval time in metrics: {:#?}",
1730+
metrics
1731+
);
1732+
}
1733+
16931734
#[tokio::test]
16941735
async fn parquet_exec_metrics() {
16951736
let c1: ArrayRef = Arc::new(StringArray::from(vec![
@@ -1709,7 +1750,7 @@ mod tests {
17091750
let filter = col("c1").not_eq(lit("bar"));
17101751

17111752
// read/write them files:
1712-
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
1753+
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
17131754

17141755
let metrics = rt.parquet_exec.metrics().unwrap();
17151756

datafusion/core/src/physical_plan/file_format/parquet/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
3535
pub pushdown_rows_filtered: Count,
3636
/// Total time spent evaluating pushdown filters
3737
pub pushdown_eval_time: Time,
38+
/// Total rows filtered out by parquet page index
39+
pub page_index_rows_filtered: Count,
40+
/// Total time spent evaluating parquet page index filters
41+
pub page_index_eval_time: Time,
3842
}
3943

4044
impl ParquetFileMetrics {
@@ -63,13 +67,22 @@ impl ParquetFileMetrics {
6367
let pushdown_eval_time = MetricBuilder::new(metrics)
6468
.with_new_label("filename", filename.to_string())
6569
.subset_time("pushdown_eval_time", partition);
70+
let page_index_rows_filtered = MetricBuilder::new(metrics)
71+
.with_new_label("filename", filename.to_string())
72+
.counter("page_index_rows_filtered", partition);
73+
74+
let page_index_eval_time = MetricBuilder::new(metrics)
75+
.with_new_label("filename", filename.to_string())
76+
.subset_time("page_index_eval_time", partition);
6677

6778
Self {
6879
predicate_evaluation_errors,
6980
row_groups_pruned,
7081
bytes_scanned,
7182
pushdown_rows_filtered,
7283
pushdown_eval_time,
84+
page_index_rows_filtered,
85+
page_index_eval_time,
7386
}
7487
}
7588
}

datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
100100
file_metadata: &ParquetMetaData,
101101
file_metrics: &ParquetFileMetrics,
102102
) -> Result<Option<RowSelection>> {
103+
// scoped timer updates on drop
104+
let _timer_guard = file_metrics.page_index_eval_time.timer();
103105
let page_index_predicates =
104106
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
105107

@@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
154156
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
155157
}
156158
let final_selection = combine_multi_col_selection(row_selections);
159+
let total_skip =
160+
final_selection.iter().fold(
161+
0,
162+
|acc, x| {
163+
if x.skip {
164+
acc + x.row_count
165+
} else {
166+
acc
167+
}
168+
},
169+
);
170+
file_metrics.page_index_rows_filtered.add(total_skip);
157171
Ok(Some(final_selection.into()))
158172
} else {
159173
Ok(None)

0 commit comments

Comments
 (0)