Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ where

/// Compute which columns are used in filters and the final (output) projection
fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option<ProjectionMask> {
// Do not compute the projection mask if the predicate cache is disabled
if self.max_predicate_cache_size == 0 {
return None;
}

let filters = self.filter.as_ref()?;
let mut cache_projection = filters.predicates.first()?.projection().clone();
for predicate in filters.predicates.iter() {
Expand Down Expand Up @@ -2611,4 +2616,96 @@ mod tests {
// error we want to reproduce.
let _result: Vec<_> = stream.try_collect().await.unwrap();
}

#[tokio::test]
async fn test_predicate_cache_disabled() {
let k = Int32Array::from_iter_values(0..10);
let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();

let mut buf = Vec::new();
// both the page row limit and batch size are set to 1 to create one page per row
let props = WriterProperties::builder()
.set_data_page_row_count_limit(1)
.set_write_batch_size(1)
.set_max_row_group_size(10)
.set_write_page_header_statistics(true)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
writer.write(&data).unwrap();
writer.close().unwrap();

let data = Bytes::from(buf);
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Required)
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();

// the filter is not clone-able, so we use a lambda to simplify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is something that makes the filters very tricky to handle internally. Nothing to change for this PR, I am just observing

let build_filter = || {
let scalar = Int32Array::from_iter_values([5]);
let predicate = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![0]),
move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
);
RowFilter::new(vec![Box::new(predicate)])
};

// select only one of the pages
let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);

let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();

// using the predicate cache (default)
let reader_with_cache = TestReader::new(data.clone());
let requests_with_cache = reader_with_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_with_cache,
reader_metadata.clone(),
)
.with_batch_size(1000)
.with_row_selection(selection.clone())
.with_row_filter(build_filter())
.build()
.unwrap();
let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();

// disabling the predicate cache
let reader_without_cache = TestReader::new(data);
let requests_without_cache = reader_without_cache.requests.clone();
let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader_without_cache,
reader_metadata,
)
.with_batch_size(1000)
.with_row_selection(selection)
.with_row_filter(build_filter())
.with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
.build()
.unwrap();
let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();

assert_eq!(batches_with_cache, batches_without_cache);

let requests_with_cache = requests_with_cache.lock().unwrap();
let requests_without_cache = requests_without_cache.lock().unwrap();

// less requests will be made without the predicate cache
assert_eq!(requests_with_cache.len(), 11);
assert_eq!(requests_without_cache.len(), 2);

// less bytes will be retrieved without the predicate cache
assert_eq!(
requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
433
);
assert_eq!(
requests_without_cache
.iter()
.map(|r| r.len())
.sum::<usize>(),
92
);
}
}
Loading