Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 120 additions & 22 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::collections::HashSet;
use std::collections::HashMap;
use std::sync::Arc;

pub(crate) struct RowNumberReader {
Expand All @@ -34,35 +34,40 @@ impl RowNumberReader {
parquet_metadata: &'a ParquetMetaData,
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
// Collect ordinals from the selected row groups
let selected_ordinals: HashSet<i16> = row_groups
.map(|rg| {
rg.ordinal().ok_or_else(|| {
ParquetError::General(
"Row group missing ordinal field, required to compute row numbers"
.to_string(),
)
})
})
.collect::<Result<_>>()?;

// Iterate through all row groups once, computing first_row_index and creating ranges
// This is O(M) where M is total row groups, much better than O(N * O) where N is selected
// Pass 1: Build a map from ordinal to first_row_index
// This is O(M) where M is the total number of row groups in the file
let mut ordinal_to_offset: HashMap<i16, i64> = HashMap::new();
let mut first_row_index: i64 = 0;
let mut ranges = Vec::new();

for rg in parquet_metadata.row_groups() {
if let Some(ordinal) = rg.ordinal() {
if selected_ordinals.contains(&ordinal) {
ranges.push((ordinal, first_row_index..first_row_index + rg.num_rows()));
}
ordinal_to_offset.insert(ordinal, first_row_index);
}
first_row_index += rg.num_rows();
}

// Sort ranges by ordinal to maintain original row group order
ranges.sort_by_key(|(ordinal, _)| *ordinal);
let ranges: Vec<_> = ranges.into_iter().map(|(_, range)| range).collect();
// Pass 2: Build ranges in the order specified by the row_groups iterator
// This is O(N) where N is the number of selected row groups
// This preserves the user's requested order instead of sorting by ordinal
let ranges: Vec<_> = row_groups
.map(|rg| {
let ordinal = rg.ordinal().ok_or_else(|| {
ParquetError::General(
"Row group missing ordinal field, required to compute row numbers"
.to_string(),
)
})?;

let offset = ordinal_to_offset.get(&ordinal).ok_or_else(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit, to simplify L68 below?

Suggested change
let offset = ordinal_to_offset.get(&ordinal).ok_or_else(|| {
let offset = *ordinal_to_offset.get(&ordinal).ok_or_else(|| {

ParquetError::General(format!(
"Row group with ordinal {} not found in metadata",
ordinal
))
})?;

Ok(*offset..*offset + rg.num_rows())
})
.collect::<Result<_>>()?;

Ok(Self {
buffered_row_numbers: Vec::new(),
Expand Down Expand Up @@ -106,3 +111,96 @@ impl ArrayReader for RowNumberReader {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::basic::Type as PhysicalType;
use crate::file::metadata::{
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
};
use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
use std::sync::Arc;

fn create_test_schema() -> Arc<SchemaDescriptor> {
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(
SchemaType::primitive_type_builder("test_col", PhysicalType::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
let schema_descr = create_test_schema();

let mut row_group_metas = vec![];
for (ordinal, num_rows) in row_groups {
let columns: Vec<_> = schema_descr
.columns()
.iter()
.map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap())
.collect();

let row_group = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_ordinal(ordinal)
.set_total_byte_size(100)
.set_column_metadata(columns)
.build()
.unwrap();
row_group_metas.push(row_group);
}

let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum();
let file_metadata = FileMetaData::new(
1, // version
total_rows, // num_rows
None, // created_by
None, // key_value_metadata
schema_descr, // schema_descr
None, // column_orders
);

ParquetMetaData::new(file_metadata, row_group_metas)
}

#[test]
fn test_row_number_reader_reverse_order() {
// Create metadata with 3 row groups, each with 2 rows
let metadata = create_test_parquet_metadata(vec![
(0, 2), // Row group 0: ordinal=0, rows 0-1
(1, 2), // Row group 1: ordinal=1, rows 2-3
(2, 2), // Row group 2: ordinal=2, rows 4-5
]);

// Select only row groups with ordinals 2 and 0 (in that order)
// This means we want row group 2 first, then row group 0, skipping row group 1
let selected_row_groups: Vec<_> = vec![
&metadata.row_groups()[2], // ordinal 2
&metadata.row_groups()[0], // ordinal 0
];

let mut reader =
RowNumberReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();

// Read all row numbers
let num_read = reader.read_records(6).unwrap();
assert_eq!(num_read, 4); // Should read 4 rows total (2 from each selected group)

let array = reader.consume_batch().unwrap();
let row_numbers = array.as_any().downcast_ref::<Int64Array>().unwrap();

// Expected: row group 2 first (rows 4-5), then row group 0 (rows 0-1)
let expected = vec![4, 5, 0, 1];
let actual: Vec<i64> = row_numbers.iter().map(|v| v.unwrap()).collect();

assert_eq!(
actual, expected,
"Row numbers should match the order of selected row groups, not file order"
);
}
}
102 changes: 96 additions & 6 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,11 @@ impl ArrowReaderOptions {
pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also made this comment in the original PR, but maybe we could change this to

pub fn set_virtual_columns(&mut self, virtual_columns: Vec<FieldRef>) -> Result<()> {

to avoid the panic. If we squeak this in before 57.1.0 ships it wouldn't be an API change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attaching a link for easier discussion: #8715 (comment)

Not having an ability to chain seems a bit degrading, though users can just use ? operator for that (in which case it just panics, but that's fine, it's a choice).
Having a proper builder would be nice, but it's then definitely an API change, and a bit less quick fix for the main thing that this PR is intended to solve.

I'm fine with changing to Result(). Would you prefer that? Are others on board?

Copy link
Contributor

@scovich scovich Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Result<Self> bad, sorry? Call it try_with_xxx if naming conventions are the problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was upthumbing

Not having an ability to chain seems a bit degrading, though users can just use ? operator for that

Meaning I'd be on board with Result<Self>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with changing to Result(). Would you prefer that? Are others on board?

I agree that sounds better to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Validate that all fields are virtual columns
for field in &virtual_columns {
if !is_virtual_column(field) {
panic!(
"Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
field.name()
);
}
assert!(
is_virtual_column(field),
"Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
field.name()
);
}
Self {
virtual_columns,
Expand Down Expand Up @@ -5546,6 +5545,97 @@ pub(crate) mod tests {
);
}

#[test]
fn test_read_row_numbers_row_group_order() -> Result<()> {
// Make a parquet file with 100 rows split across 2 row groups
let array = Int64Array::from_iter_values(5000..5100);
let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
let mut buffer = Vec::new();
let options = WriterProperties::builder()
.set_max_row_group_size(50)
.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
// write in 10 row batches as the size limits are enforced after each batch
for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
writer.write(&batch_chunk)?;
}
writer.close()?;

let row_number_field = Arc::new(
Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
);

let buffer = Bytes::from(buffer);

let options =
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()]);

// read out with normal options
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
.build()?;

assert_eq!(
ValuesAndRowNumbers {
values: (5000..5100).collect(),
row_numbers: (0..100).collect()
},
ValuesAndRowNumbers::new_from_reader(arrow_reader)
);

// Now read, out of order row groups
let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
.with_row_groups(vec![1, 0])
.build()?;

assert_eq!(
ValuesAndRowNumbers {
values: (5050..5100).chain(5000..5050).collect(),
row_numbers: (50..100).chain(0..50).collect(),
},
ValuesAndRowNumbers::new_from_reader(arrow_reader)
);

Ok(())
}

#[derive(Debug, PartialEq)]
struct ValuesAndRowNumbers {
values: Vec<i64>,
row_numbers: Vec<i64>,
}
impl ValuesAndRowNumbers {
fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
let mut values = vec![];
let mut row_numbers = vec![];
for batch in reader {
let batch = batch.expect("Could not read batch");
values.extend(
batch
.column_by_name("col")
.expect("Could not get col column")
.as_primitive::<arrow::datatypes::Int64Type>()
.iter()
.map(|v| v.expect("Could not get value")),
);

row_numbers.extend(
batch
.column_by_name("row_number")
.expect("Could not get row_number column")
.as_primitive::<arrow::datatypes::Int64Type>()
.iter()
.map(|v| v.expect("Could not get row number"))
.collect::<Vec<_>>(),
);
}
Self {
values,
row_numbers,
}
}
}

#[test]
#[should_panic(expected = "is not a virtual column")]
fn test_with_virtual_columns_rejects_non_virtual_fields() {
Expand Down
3 changes: 1 addition & 2 deletions parquet/src/arrow/schema/virtual_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl ExtensionType for RowNumber {
pub fn is_virtual_column(field: &Field) -> bool {
field
.extension_type_name()
.map(|name| name.starts_with(VIRTUAL_PREFIX!()))
.unwrap_or(false)
.is_some_and(|name| name.starts_with(VIRTUAL_PREFIX!()))
}

#[cfg(test)]
Expand Down
Loading