Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
use std::time::Instant;
Expand Down Expand Up @@ -73,7 +74,19 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
let mut props_builder = WriterProperties::builder();

if let Some(s) = opt.page_size {
props_builder = props_builder
.set_data_pagesize_limit(s)
.set_write_batch_size(s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looking good!👍 Once i try to create page less then DEFAULT_WRITE_BATCH_SIZE row count, forgot set write_batch_size puzzle me a while 😂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add this info in arrow-rs 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add this info in arrow-rs 🤔

@Ted-Jiang I am not sure what you mean. Do you mean improve the documentation here?

https://docs.rs/parquet/latest/parquet/file/properties/struct.WriterPropertiesBuilder.html#method.set_write_batch_size

Copy link
Member

@Ted-Jiang Ted-Jiang Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
I mean set here https://docs.rs/parquet/latest/parquet/file/properties/struct.WriterPropertiesBuilder.html#method.set_data_page_row_count_limit

if user only set set_data_page_row_count_limit to 100(without set_write_batch_size to 100 ), it less than default DEFAULT_WRITE_BATCH_SIZE(1024), i think it still cut 1024 row one page🤔.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will test it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test shows:
First i set in single_file_small_data_pages

 let props = WriterProperties::builder()
        .set_data_page_row_count_limit(100)
        .build();

then i run parquet-tools column-index -c service /Users/yangjiang/data_8311.parquet

offset index for column service:
                          offset   compressed size       first row index
page-0                        29                48                     0
page-1                        77                48                  1024
page-2                       125                48                  2048
page-3                       173                48                  3072
page-4                       221                48                  4096
page-5                       269                48                  5120
page-6                       317                48                  6144
page-7                       365                48                  7168
page-8                       413                48                  8192
page-9                       461                48                  9216
page-10                      509                48                 10240
page-11                      557                48                 11264
page-12                      605                48                 12288
page-13                      653                48                 13312
page-14                      701                48                 14336
page-15                      749                48                 15360
page-16                      797                48                 16384
page-17                      845                48                 17408
page-18                      893                48                 18432
page-19                      941                48

Copy link
Member

@Ted-Jiang Ted-Jiang Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So only when data_page_row_count_limit less than write_batch_size it works 😂
I think because arrow-rs are vectored writer, write one batch one time🤔

Copy link
Member

@Ted-Jiang Ted-Jiang Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's already here 😭 , i should read them more carefully😩

Note: this is a best effort limit based on the write batch size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I filed apache/arrow-rs#3068 to try and make the comments somewhat clearer

}

if let Some(s) = opt.row_group_size {
props_builder = props_builder.set_max_row_group_size(s);
}

let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;

run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;

Expand Down Expand Up @@ -137,14 +150,9 @@ async fn run_benchmarks(
println!("Using scan options {:?}", scan_options);
for i in 0..iterations {
let start = Instant::now();
let rows = exec_scan(
ctx,
test_file,
filter_expr.clone(),
scan_options.clone(),
debug,
)
.await?;
let rows =
exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
.await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
Expand Down Expand Up @@ -179,17 +187,11 @@ async fn exec_scan(
fn gen_data(
path: PathBuf,
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
props: WriterProperties,
) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();

let num_batches = 100_f32 * scale_factor;

TestParquetFile::try_new(
path,
generator.take(num_batches as usize),
page_size,
row_group_size,
)
TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
}
7 changes: 7 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
///
Expand Down Expand Up @@ -415,6 +416,12 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let mut options = RecordBatchOptions::default();
options.row_count = Some(statistics.num_containers());

trace!(
"Creating statistics batch for {:#?} with {:#?}",
required_columns,
arrays
);

RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
DataFusionError::Plan(format!("Can not create statistics record batch: {}", err))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::expr_to_columns;
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error};
use log::{debug, error, trace};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand Down Expand Up @@ -143,15 +143,19 @@ pub(crate) fn build_page_filter(
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate:{:?}",
&selectors, predicate
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
Expand Down Expand Up @@ -321,7 +325,7 @@ fn prune_pages_in_one_row_group(
assert_eq!(row_vec.len(), values.len());
let mut sum_row = *row_vec.first().unwrap();
let mut selected = *values.first().unwrap();

trace!("Pruned to to {:?} using {:?}", values, pruning_stats);
for (i, &f) in values.iter().skip(1).enumerate() {
if f == selected {
sum_row += *row_vec.get(i).unwrap();
Expand Down Expand Up @@ -376,6 +380,7 @@ fn create_row_count_in_each_page(

/// Wraps one col page_index in one rowGroup statistics in a way
/// that implements [`PruningStatistics`]
#[derive(Debug)]
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
Expand Down
Loading