Skip to content

Commit 6e5a871

Browse files
authored
feat: Read Parquet data file with projection (#245)
* feat: Read Parquet data file with projection * fix * Update * More * For review * Use FeatureUnsupported error.
1 parent 162f16e commit 6e5a871

File tree

2 files changed

+178
-22
lines changed

2 files changed

+178
-22
lines changed

crates/iceberg/src/arrow.rs

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
//! Parquet file data reader
1919
20+
use arrow_schema::SchemaRef as ArrowSchemaRef;
2021
use async_stream::try_stream;
2122
use futures::stream::StreamExt;
22-
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
23+
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
24+
use parquet::schema::types::SchemaDescriptor;
25+
use std::collections::HashMap;
26+
use std::str::FromStr;
2327

2428
use crate::io::FileIO;
25-
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
29+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
2630
use crate::spec::SchemaRef;
2731

2832
use crate::error::Result;
@@ -36,6 +40,7 @@ use std::sync::Arc;
3640
/// Builder to create ArrowReader
3741
pub struct ArrowReaderBuilder {
3842
batch_size: Option<usize>,
43+
field_ids: Vec<usize>,
3944
file_io: FileIO,
4045
schema: SchemaRef,
4146
}
@@ -45,6 +50,7 @@ impl ArrowReaderBuilder {
4550
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
4651
ArrowReaderBuilder {
4752
batch_size: None,
53+
field_ids: vec![],
4854
file_io,
4955
schema,
5056
}
@@ -57,10 +63,17 @@ impl ArrowReaderBuilder {
5763
self
5864
}
5965

66+
/// Sets the desired column projection with a list of field ids.
67+
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
68+
self.field_ids = field_ids.into_iter().collect();
69+
self
70+
}
71+
6072
/// Build the ArrowReader.
6173
pub fn build(self) -> ArrowReader {
6274
ArrowReader {
6375
batch_size: self.batch_size,
76+
field_ids: self.field_ids,
6477
schema: self.schema,
6578
file_io: self.file_io,
6679
}
@@ -70,6 +83,7 @@ impl ArrowReaderBuilder {
7083
/// Reads data from Parquet files
7184
pub struct ArrowReader {
7285
batch_size: Option<usize>,
86+
field_ids: Vec<usize>,
7387
#[allow(dead_code)]
7488
schema: SchemaRef,
7589
file_io: FileIO,
@@ -83,17 +97,18 @@ impl ArrowReader {
8397

8498
Ok(try_stream! {
8599
while let Some(Ok(task)) = tasks.next().await {
86-
87-
let projection_mask = self.get_arrow_projection_mask(&task);
88-
89100
let parquet_reader = file_io
90101
.new_input(task.data().data_file().file_path())?
91102
.reader()
92103
.await?;
93104

94105
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
95-
.await?
96-
.with_projection(projection_mask);
106+
.await?;
107+
108+
let parquet_schema = batch_stream_builder.parquet_schema();
109+
let arrow_schema = batch_stream_builder.schema();
110+
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
111+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
97112

98113
if let Some(batch_size) = self.batch_size {
99114
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
@@ -109,9 +124,73 @@ impl ArrowReader {
109124
.boxed())
110125
}
111126

112-
fn get_arrow_projection_mask(&self, _task: &FileScanTask) -> ProjectionMask {
113-
// TODO: full implementation
114-
ProjectionMask::all()
127+
fn get_arrow_projection_mask(
128+
&self,
129+
parquet_schema: &SchemaDescriptor,
130+
arrow_schema: &ArrowSchemaRef,
131+
) -> crate::Result<ProjectionMask> {
132+
if self.field_ids.is_empty() {
133+
Ok(ProjectionMask::all())
134+
} else {
135+
// Build the map between field id and column index in Parquet schema.
136+
let mut column_map = HashMap::new();
137+
138+
let fields = arrow_schema.fields();
139+
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
140+
fields.filter_leaves(|idx, field| {
141+
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
142+
if field_id.is_none() {
143+
return false;
144+
}
145+
146+
let field_id = i32::from_str(field_id.unwrap());
147+
if field_id.is_err() {
148+
return false;
149+
}
150+
let field_id = field_id.unwrap();
151+
152+
if !self.field_ids.contains(&(field_id as usize)) {
153+
return false;
154+
}
155+
156+
let iceberg_field = self.schema.field_by_id(field_id);
157+
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
158+
159+
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
160+
return false;
161+
}
162+
163+
if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type {
164+
return false;
165+
}
166+
167+
column_map.insert(field_id, idx);
168+
true
169+
});
170+
171+
if column_map.len() != self.field_ids.len() {
172+
return Err(Error::new(
173+
ErrorKind::DataInvalid,
174+
format!(
175+
"Parquet schema {} and Iceberg schema {} do not match.",
176+
iceberg_schema, self.schema
177+
),
178+
));
179+
}
180+
181+
let mut indices = vec![];
182+
for field_id in &self.field_ids {
183+
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
184+
indices.push(*col_idx);
185+
} else {
186+
return Err(Error::new(
187+
ErrorKind::DataInvalid,
188+
format!("Field {} is not found in Parquet schema.", field_id),
189+
));
190+
}
191+
}
192+
Ok(ProjectionMask::leaves(parquet_schema, indices))
193+
}
115194
}
116195
}
117196

crates/iceberg/src/scan.rs

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ impl<'a> TableScanBuilder<'a> {
109109
if schema.field_by_name(column_name).is_none() {
110110
return Err(Error::new(
111111
ErrorKind::DataInvalid,
112-
format!("Column {} not found in table.", column_name),
112+
format!(
113+
"Column {} not found in table. Schema: {}",
114+
column_name, schema
115+
),
113116
));
114117
}
115118
}
@@ -187,6 +190,46 @@ impl TableScan {
187190
let mut arrow_reader_builder =
188191
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());
189192

193+
let mut field_ids = vec![];
194+
for column_name in &self.column_names {
195+
let field_id = self.schema.field_id_by_name(column_name).ok_or_else(|| {
196+
Error::new(
197+
ErrorKind::DataInvalid,
198+
format!(
199+
"Column {} not found in table. Schema: {}",
200+
column_name, self.schema
201+
),
202+
)
203+
})?;
204+
205+
let field = self.schema
206+
.as_struct()
207+
.field_by_id(field_id)
208+
.ok_or_else(|| {
209+
Error::new(
210+
ErrorKind::FeatureUnsupported,
211+
format!(
212+
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
213+
column_name, self.schema
214+
),
215+
)
216+
})?;
217+
218+
if !field.field_type.is_primitive() {
219+
return Err(Error::new(
220+
ErrorKind::FeatureUnsupported,
221+
format!(
222+
"Column {} is not a primitive type. Schema: {}",
223+
column_name, self.schema
224+
),
225+
));
226+
}
227+
228+
field_ids.push(field_id as usize);
229+
}
230+
231+
arrow_reader_builder = arrow_reader_builder.with_field_ids(field_ids);
232+
190233
if let Some(batch_size) = self.batch_size {
191234
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
192235
}
@@ -390,18 +433,29 @@ mod tests {
390433

391434
// prepare data
392435
let schema = {
393-
let fields =
394-
vec![
395-
arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true)
396-
.with_metadata(HashMap::from([(
397-
PARQUET_FIELD_ID_META_KEY.to_string(),
398-
"0".to_string(),
399-
)])),
400-
];
436+
let fields = vec![
437+
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
438+
.with_metadata(HashMap::from([(
439+
PARQUET_FIELD_ID_META_KEY.to_string(),
440+
"1".to_string(),
441+
)])),
442+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
443+
.with_metadata(HashMap::from([(
444+
PARQUET_FIELD_ID_META_KEY.to_string(),
445+
"2".to_string(),
446+
)])),
447+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
448+
.with_metadata(HashMap::from([(
449+
PARQUET_FIELD_ID_META_KEY.to_string(),
450+
"3".to_string(),
451+
)])),
452+
];
401453
Arc::new(arrow_schema::Schema::new(fields))
402454
};
403-
let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
404-
let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
455+
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
456+
let col2 = Arc::new(Int64Array::from_iter_values(vec![2; 1024])) as ArrayRef;
457+
let col3 = Arc::new(Int64Array::from_iter_values(vec![3; 1024])) as ArrayRef;
458+
let to_write = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap();
405459

406460
// Write the Parquet files
407461
let props = WriterProperties::builder()
@@ -531,9 +585,32 @@ mod tests {
531585

532586
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
533587

534-
let col = batches[0].column_by_name("col").unwrap();
588+
let col = batches[0].column_by_name("x").unwrap();
535589

536590
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
537591
assert_eq!(int64_arr.value(0), 1);
538592
}
593+
594+
#[tokio::test]
595+
async fn test_open_parquet_with_projection() {
596+
let mut fixture = TableTestFixture::new();
597+
fixture.setup_manifest_files().await;
598+
599+
// Create table scan for current snapshot and plan files
600+
let table_scan = fixture.table.scan().select(["x", "z"]).build().unwrap();
601+
602+
let batch_stream = table_scan.to_arrow().await.unwrap();
603+
604+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
605+
606+
assert_eq!(batches[0].num_columns(), 2);
607+
608+
let col1 = batches[0].column_by_name("x").unwrap();
609+
let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
610+
assert_eq!(int64_arr.value(0), 1);
611+
612+
let col2 = batches[0].column_by_name("z").unwrap();
613+
let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
614+
assert_eq!(int64_arr.value(0), 3);
615+
}
539616
}

0 commit comments

Comments
 (0)