From 1c97dc4b70de87b287fdd61d1187c908eea92713 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 4 Nov 2022 17:47:04 +0800 Subject: [PATCH 1/4] Add statistics for parquet page level skipping Signed-off-by: yangjiang --- .../src/datasource/file_format/parquet.rs | 60 ++++++++++++------ .../src/physical_plan/file_format/parquet.rs | 61 ++++++++++++++++--- .../file_format/parquet/metrics.rs | 13 ++++ .../file_format/parquet/page_filter.rs | 14 +++++ 4 files changed, 119 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07819bdf52cb..d7044bc4e4c7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -532,25 +532,45 @@ pub(crate) mod test_util { pub async fn store_parquet( batches: Vec, + multi_page: bool, ) -> Result<(Vec, Vec)> { - let files: Vec<_> = batches - .into_iter() - .map(|batch| { - let mut output = NamedTempFile::new().expect("creating temp file"); - - let props = WriterProperties::builder().build(); - let mut writer = - ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) - .expect("creating writer"); - - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); - output - }) - .collect(); - - let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); - Ok((meta, files)) + if multi_page { + // All batches write in to one file, each batch must have same schema. + let mut output = NamedTempFile::new().expect("creating temp file"); + let mut builder = WriterProperties::builder(); + // todo https://github.com/apache/arrow-rs/issues/2941 release change to row limit. + builder = builder.set_data_pagesize_limit(1); + builder = builder.set_write_batch_size(1); + let proper = builder.build(); + let mut writer = + ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper)) + .expect("creating writer"); + for b in batches { + writer.write(&b).expect("Writing batch"); + } + writer.close().unwrap(); + Ok((vec![local_unpartitioned_file(&output)], vec![output])) + } else { + // Each batch writes to their own file + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let mut output = NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let mut writer = + ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) + } } } @@ -599,7 +619,7 @@ mod tests { let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); let store = Arc::new(LocalFileSystem::new()) as _; - let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; let format = ParquetFormat::default(); let schema = format.infer_schema(&store, &meta).await.unwrap(); @@ -738,7 +758,7 @@ mod tests { let store = Arc::new(RequestCountingObjectStore::new(Arc::new( LocalFileSystem::new(), ))); - let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 87c3c220d186..e678efff9226 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener { // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if let Some(row_selection) = enable_page_index + if let Some(row_selection) = (enable_page_index && !row_groups.is_empty()) .then(|| { page_filter::build_page_filter( pruning_predicate.as_ref(), @@ -919,7 +919,7 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::Float32Array; + use arrow::array::{Float32Array, Int32Array}; use arrow::datatypes::DataType::Decimal128; use arrow::record_batch::RecordBatch; use arrow::{ @@ -960,9 +960,16 @@ mod tests { predicate: Option, pushdown_predicate: bool, ) -> Result> { - round_trip(batches, projection, schema, predicate, pushdown_predicate) - .await - .batches + round_trip( + batches, + projection, + schema, + predicate, + pushdown_predicate, + false, + ) + .await + .batches } /// Writes each RecordBatch as an individual parquet file and then @@ -974,6 +981,7 @@ mod tests { schema: Option, predicate: Option, pushdown_predicate: bool, + page_index_predicate: bool, ) -> RoundTripResult { let file_schema = match schema { Some(schema) => schema, @@ -983,7 +991,7 @@ mod tests { ), }; - let (meta, _files) = store_parquet(batches).await.unwrap(); + let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); let file_groups = meta.into_iter().map(Into::into).collect(); // prepare the scan @@ -1008,6 +1016,10 @@ mod tests { .with_reorder_filters(true); } + if page_index_predicate { + parquet_exec = parquet_exec.with_enable_page_index(true); + } + let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let parquet_exec = Arc::new(parquet_exec); @@ -1225,7 +1237,8 @@ mod tests { let filter = col("c2").eq(lit(2_i64)); // read/write them files: - let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; let expected = vec![ "+----+----+----+", "| c1 | c3 | c2 |", @@ -1374,7 +1387,8 @@ mod tests { let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; let expected = vec![ "+----+----+", @@ -1695,6 +1709,35 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_page_index_exec_metrics() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); + let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])); + let batch1 = create_batch(vec![("int", c1.clone())]); + let batch2 = create_batch(vec![("int", c2.clone())]); + + let filter = col("int").eq(lit(4_i32)); + + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await; + + let metrics = rt.parquet_exec.metrics().unwrap(); + + // todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit. + // assert the batches and some metrics + let expected = vec![ + "+-----+", "| int |", "+-----+", "| |", "| 1 |", "| 2 |", "| 3 |", + "| 4 |", "| 5 |", "+-----+", + ]; + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0); + assert!( + get_value(&metrics, "page_index_eval_time") > 0, + "no eval time in metrics: {:#?}", + metrics + ); + } + #[tokio::test] async fn parquet_exec_metrics() { let c1: ArrayRef = Arc::new(StringArray::from(vec![ @@ -1714,7 +1757,7 @@ mod tests { let filter = col("c1").not_eq(lit("bar")); // read/write them files: - let rt = round_trip(vec![batch1], None, None, Some(filter), true).await; + let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await; let metrics = rt.parquet_exec.metrics().unwrap(); diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs index 58e340e62417..64e08753abe2 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs @@ -35,6 +35,10 @@ pub struct ParquetFileMetrics { pub pushdown_rows_filtered: Count, /// Total time spent evaluating pushdown filters pub pushdown_eval_time: Time, + /// Total rows filtered out by parquet page index + pub page_index_rows_filtered: Count, + /// Total time spent evaluating parquet page index filters + pub page_index_eval_time: Time, } impl ParquetFileMetrics { @@ -63,6 +67,13 @@ impl ParquetFileMetrics { let pushdown_eval_time = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .subset_time("pushdown_eval_time", partition); + let page_index_rows_filtered = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("page_index_rows_filtered", partition); + + let page_index_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("page_index_eval_time", partition); Self { predicate_evaluation_errors, @@ -70,6 +81,8 @@ impl ParquetFileMetrics { bytes_scanned, pushdown_rows_filtered, pushdown_eval_time, + page_index_rows_filtered, + page_index_eval_time, } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 37002af87608..828d213758fd 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -100,6 +100,8 @@ pub(crate) fn build_page_filter( file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> Result> { + // scoped timer updates on drop + let _timer_guard = file_metrics.page_index_eval_time.timer(); let page_index_predicates = extract_page_index_push_down_predicates(pruning_predicate, schema)?; @@ -154,6 +156,18 @@ pub(crate) fn build_page_filter( row_selections.push_back(selectors.into_iter().flatten().collect::>()); } let final_selection = combine_multi_col_selection(row_selections); + let total_skip = + final_selection.iter().fold( + 0, + |acc, x| { + if x.skip { + acc + x.row_count + } else { + acc + } + }, + ); + file_metrics.page_index_rows_filtered.add(total_skip); Ok(Some(final_selection.into())) } else { Ok(None) From 005b10140e0b2da6340d2070c0d6086b475a4f9e Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 5 Nov 2022 22:16:18 +0800 Subject: [PATCH 2/4] use page row limit fix ut Signed-off-by: yangjiang --- datafusion/core/src/datasource/file_format/parquet.rs | 4 +--- datafusion/core/src/physical_plan/file_format/parquet.rs | 5 ++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index d7044bc4e4c7..3de58d456a89 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -538,9 +538,7 @@ pub(crate) mod test_util { // All batches write in to one file, each batch must have same schema. let mut output = NamedTempFile::new().expect("creating temp file"); let mut builder = WriterProperties::builder(); - // todo https://github.com/apache/arrow-rs/issues/2941 release change to row limit. - builder = builder.set_data_pagesize_limit(1); - builder = builder.set_write_batch_size(1); + builder = builder.set_data_page_row_count_limit(2); let proper = builder.build(); let mut writer = ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper)) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index e678efff9226..b72d9b3990bb 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1726,11 +1726,10 @@ mod tests { // todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit. // assert the batches and some metrics let expected = vec![ - "+-----+", "| int |", "+-----+", "| |", "| 1 |", "| 2 |", "| 3 |", - "| 4 |", "| 5 |", "+-----+", + "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+", ]; assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); - assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0); + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3); assert!( get_value(&metrics, "page_index_eval_time") > 0, "no eval time in metrics: {:#?}", From 649cc9399b01fb8663447b445235da88f0d010d1 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 5 Nov 2022 22:35:04 +0800 Subject: [PATCH 3/4] fix. Signed-off-by: yangjiang --- datafusion/core/src/physical_plan/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index b72d9b3990bb..0a5691568fc5 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1726,7 +1726,7 @@ mod tests { // todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit. // assert the batches and some metrics let expected = vec![ - "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+", + "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+", ]; assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3); From 12e7e1cfbaf87c2396dbd40a34e06ab6e9cd80be Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 5 Nov 2022 22:54:37 +0800 Subject: [PATCH 4/4] remove todo Signed-off-by: yangjiang --- datafusion/core/src/physical_plan/file_format/parquet.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0a5691568fc5..90ff8587906d 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1723,7 +1723,6 @@ mod tests { let metrics = rt.parquet_exec.metrics().unwrap(); - // todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit. // assert the batches and some metrics let expected = vec![ "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",