Skip to content

Commit dd683bd

Browse files
committed
Remove file_length from the ParquetPushDecoderBuilder
1 parent ee64444 commit dd683bd

File tree

2 files changed

+68
-101
lines changed

2 files changed

+68
-101
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub use metadata::*;
5454
mod store;
5555

5656
use crate::DecodeResult;
57-
use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
57+
use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
5858
#[cfg(feature = "object_store")]
5959
pub use store::*;
6060

@@ -500,9 +500,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
500500
let projected_schema = Arc::new(Schema::new(projected_fields));
501501

502502
let decoder = ParquetPushDecoderBuilder {
503-
// Async reader doesn't know the overall size of the input, but it
504-
// is not required for decoding as we will already have the metadata
505-
input: 0,
503+
input: NoInput,
506504
metadata,
507505
schema,
508506
fields,

parquet/src/arrow/push_decoder/mod.rs

Lines changed: 66 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use std::sync::Arc;
8181
/// # let parquet_metadata = Arc::new(parquet_metadata);
8282
/// // The file length and metadata are required to create the decoder
8383
/// let mut decoder =
84-
/// ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata)
84+
/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
8585
/// .unwrap()
8686
/// // Optionally configure the decoder, e.g. batch size
8787
/// .with_batch_size(1024)
@@ -110,7 +110,19 @@ use std::sync::Arc;
110110
/// }
111111
/// }
112112
/// ```
113-
pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>; // u64 is the file length, if known
113+
pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
114+
115+
/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
116+
///
117+
/// There is no "input" for the push decoder by design (the idea is that
118+
/// the caller pushes data to the decoder as needed)..
119+
///
120+
/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
121+
/// which DO have an `input`. To support reusing the same builder code for
122+
/// all three types of decoders, we define this `NoInput` for the push decoder to
123+
/// denote in the type system there is no type.
124+
#[derive(Debug, Clone, Copy)]
125+
pub struct NoInput;
114126

115127
/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
116128
/// more options that can be configured.
@@ -122,15 +134,8 @@ impl ParquetPushDecoderBuilder {
122134
/// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
123135
///
124136
/// See example on [`ParquetPushDecoderBuilder`]
125-
pub fn try_new_decoder(
126-
file_len: u64,
127-
parquet_metadata: Arc<ParquetMetaData>,
128-
) -> Result<Self, ParquetError> {
129-
Self::try_new_decoder_with_options(
130-
file_len,
131-
parquet_metadata,
132-
ArrowReaderOptions::default(),
133-
)
137+
pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
138+
Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
134139
}
135140

136141
/// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
@@ -139,27 +144,26 @@ impl ParquetPushDecoderBuilder {
139144
/// This is similar to [`Self::try_new_decoder`] but allows configuring
140145
/// options such as Arrow schema
141146
pub fn try_new_decoder_with_options(
142-
file_len: u64,
143147
parquet_metadata: Arc<ParquetMetaData>,
144148
arrow_reader_options: ArrowReaderOptions,
145149
) -> Result<Self, ParquetError> {
146150
let arrow_reader_metadata =
147151
ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
148-
Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
152+
Ok(Self::new_with_metadata(arrow_reader_metadata))
149153
}
150154

151155
/// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
152156
///
153157
/// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
154158
/// the Parquet metadata and reader options.
155-
pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self {
156-
Self::new_builder(file_len, arrow_reader_metadata)
159+
pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
160+
Self::new_builder(NoInput, arrow_reader_metadata)
157161
}
158162

159163
/// Create a [`ParquetPushDecoder`] with the configured options
160164
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
161165
let Self {
162-
input: file_len,
166+
input: NoInput,
163167
metadata: parquet_metadata,
164168
schema: _,
165169
fields,
@@ -179,6 +183,7 @@ impl ParquetPushDecoderBuilder {
179183
row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
180184

181185
// Prepare to build RowGroup readers
186+
let file_len = 0; // not used in push decoder
182187
let buffers = PushBuffers::new(file_len);
183188
let row_group_reader_builder = RowGroupReaderBuilder::new(
184189
batch_size,
@@ -600,13 +605,10 @@ mod test {
600605
/// available in memory
601606
#[test]
602607
fn test_decoder_all_data() {
603-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
604-
test_file_len(),
605-
test_file_parquet_metadata(),
606-
)
607-
.unwrap()
608-
.build()
609-
.unwrap();
608+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
609+
.unwrap()
610+
.build()
611+
.unwrap();
610612

611613
decoder
612614
.push_range(test_file_range(), TEST_FILE_DATA.clone())
@@ -629,13 +631,10 @@ mod test {
629631
/// fetched as needed
630632
#[test]
631633
fn test_decoder_incremental() {
632-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
633-
test_file_len(),
634-
test_file_parquet_metadata(),
635-
)
636-
.unwrap()
637-
.build()
638-
.unwrap();
634+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
635+
.unwrap()
636+
.build()
637+
.unwrap();
639638

640639
let mut results = vec![];
641640

@@ -668,13 +667,10 @@ mod test {
668667
/// Decode the entire file incrementally, simulating partial reads
669668
#[test]
670669
fn test_decoder_partial() {
671-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
672-
test_file_len(),
673-
test_file_parquet_metadata(),
674-
)
675-
.unwrap()
676-
.build()
677-
.unwrap();
670+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
671+
.unwrap()
672+
.build()
673+
.unwrap();
678674

679675
// First row group, expect a single request for all data needed to read "a" and "b"
680676
let ranges = expect_needs_data(decoder.try_decode());
@@ -712,11 +708,8 @@ mod test {
712708
/// only a single request per row group
713709
#[test]
714710
fn test_decoder_selection_does_one_request() {
715-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
716-
test_file_len(),
717-
test_file_parquet_metadata(),
718-
)
719-
.unwrap();
711+
let builder =
712+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
720713

721714
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
722715

@@ -750,11 +743,8 @@ mod test {
750743
/// of the data needed for the filter at a time simulating partial reads.
751744
#[test]
752745
fn test_decoder_single_filter_partial() {
753-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
754-
test_file_len(),
755-
test_file_parquet_metadata(),
756-
)
757-
.unwrap();
746+
let builder =
747+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
758748

759749
// Values in column "a" range 0..399
760750
// First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
@@ -811,11 +801,8 @@ mod test {
811801
/// Decode with a filter where we also skip one of the RowGroups via a RowSelection
812802
#[test]
813803
fn test_decoder_single_filter_and_row_selection() {
814-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
815-
test_file_len(),
816-
test_file_parquet_metadata(),
817-
)
818-
.unwrap();
804+
let builder =
805+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
819806

820807
// Values in column "a" range 0..399
821808
// First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
@@ -866,11 +853,8 @@ mod test {
866853
#[test]
867854
fn test_decoder_multi_filters() {
868855
// Create a decoder for decoding parquet data (note it does not have any IO / readers)
869-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
870-
test_file_len(),
871-
test_file_parquet_metadata(),
872-
)
873-
.unwrap();
856+
let builder =
857+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
874858

875859
// Values in column "a" range 0..399
876860
// Values in column "b" range 400..799
@@ -951,11 +935,8 @@ mod test {
951935
#[test]
952936
fn test_decoder_reuses_filter_pages() {
953937
// Create a decoder for decoding parquet data (note it does not have any IO / readers)
954-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
955-
test_file_len(),
956-
test_file_parquet_metadata(),
957-
)
958-
.unwrap();
938+
let builder =
939+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
959940

960941
// Values in column "a" range 0..399
961942
// First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
@@ -1002,11 +983,8 @@ mod test {
1002983

1003984
#[test]
1004985
fn test_decoder_empty_filters() {
1005-
let builder = ParquetPushDecoderBuilder::try_new_decoder(
1006-
test_file_len(),
1007-
test_file_parquet_metadata(),
1008-
)
1009-
.unwrap();
986+
let builder =
987+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1010988
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1011989

1012990
// only read column "c", but with empty filters
@@ -1044,17 +1022,14 @@ mod test {
10441022

10451023
#[test]
10461024
fn test_decoder_offset_limit() {
1047-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
1048-
test_file_len(),
1049-
test_file_parquet_metadata(),
1050-
)
1051-
.unwrap()
1052-
// skip entire first row group (200 rows) and first 25 rows of second row group
1053-
.with_offset(225)
1054-
// and limit to 20 rows
1055-
.with_limit(20)
1056-
.build()
1057-
.unwrap();
1025+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1026+
.unwrap()
1027+
// skip entire first row group (200 rows) and first 25 rows of second row group
1028+
.with_offset(225)
1029+
// and limit to 20 rows
1030+
.with_limit(20)
1031+
.build()
1032+
.unwrap();
10581033

10591034
// First row group should be skipped,
10601035

@@ -1073,14 +1048,11 @@ mod test {
10731048
#[test]
10741049
fn test_decoder_row_group_selection() {
10751050
// take only the second row group
1076-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
1077-
test_file_len(),
1078-
test_file_parquet_metadata(),
1079-
)
1080-
.unwrap()
1081-
.with_row_groups(vec![1])
1082-
.build()
1083-
.unwrap();
1051+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1052+
.unwrap()
1053+
.with_row_groups(vec![1])
1054+
.build()
1055+
.unwrap();
10841056

10851057
// First row group should be skipped,
10861058

@@ -1099,17 +1071,14 @@ mod test {
10991071
#[test]
11001072
fn test_decoder_row_selection() {
11011073
// take only the second row group
1102-
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
1103-
test_file_len(),
1104-
test_file_parquet_metadata(),
1105-
)
1106-
.unwrap()
1107-
.with_row_selection(RowSelection::from(vec![
1108-
RowSelector::skip(225), // skip first row group and 25 rows of second])
1109-
RowSelector::select(20), // take 20 rows
1110-
]))
1111-
.build()
1112-
.unwrap();
1074+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1075+
.unwrap()
1076+
.with_row_selection(RowSelection::from(vec![
1077+
RowSelector::skip(225), // skip first row group and 25 rows of second])
1078+
RowSelector::select(20), // take 20 rows
1079+
]))
1080+
.build()
1081+
.unwrap();
11131082

11141083
// First row group should be skipped,
11151084

0 commit comments

Comments
 (0)