Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub mod delete_file_loader;
pub(crate) mod delete_filter;

mod reader;
pub(crate) mod record_batch_projector;
/// RecordBatch projection utilities
pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod value;

Expand Down
71 changes: 68 additions & 3 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::schema::schema_to_arrow_schema;
use crate::error::Result;
use crate::spec::Schema as IcebergSchema;
use crate::{Error, ErrorKind};

/// Help to project specific field from `RecordBatch`` according to the fields id.
#[derive(Clone, Debug)]
pub(crate) struct RecordBatchProjector {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecordBatchProjector {
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
Expand Down Expand Up @@ -77,6 +80,46 @@ impl RecordBatchProjector {
})
}

/// Create RecordBatchProjector using Iceberg schema.
///
/// This constructor converts the Iceberg schema to Arrow schema with field ID metadata,
/// then uses the standard field ID lookup for projection.
///
/// # Arguments
/// * `iceberg_schema` - The Iceberg schema for field ID mapping
/// * `target_field_ids` - The field IDs to project
pub fn from_iceberg_schema(
iceberg_schema: Arc<IcebergSchema>,
target_field_ids: &[i32],
) -> Result<Self> {
let arrow_schema_with_ids = Arc::new(schema_to_arrow_schema(&iceberg_schema)?);

let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let field_id = value.parse::<i32>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Failed to parse field id".to_string(),
)
.with_context("value", value)
.with_source(e)
})?;
Ok(Some(field_id as i64))
} else {
Ok(None)
}
};

let searchable_field_func = |_field: &Field| -> bool { true };

Self::new(
arrow_schema_with_ids,
target_field_ids,
field_id_fetch_func,
searchable_field_func,
)
}

fn fetch_field_index<F1, F2>(
fields: &Fields,
index_vec: &mut Vec<usize>,
Expand Down Expand Up @@ -129,7 +172,7 @@ impl RecordBatchProjector {
}

/// Do projection with columns
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.field_indices
.iter()
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
Expand Down Expand Up @@ -166,6 +209,7 @@ mod test {
use arrow_schema::{DataType, Field, Fields, Schema};

use crate::arrow::record_batch_projector::RecordBatchProjector;
use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema, Type};
use crate::{Error, ErrorKind};

#[test]
Expand Down Expand Up @@ -293,4 +337,25 @@ mod test {
RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true);
assert!(projector.is_ok());
}

#[test]
fn test_from_iceberg_schema() {
let iceberg_schema = IcebergSchema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(3, "age", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap();

let projector =
RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1, 3]).unwrap();

assert_eq!(projector.field_indices.len(), 2);
assert_eq!(projector.projected_schema_ref().fields().len(), 2);
assert_eq!(projector.projected_schema_ref().field(0).name(), "id");
assert_eq!(projector.projected_schema_ref().field(1).name(), "age");
}
}
4 changes: 3 additions & 1 deletion crates/iceberg/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Transform function used to compute partition values.

use std::fmt::Debug;

use arrow_array::ArrayRef;

use crate::spec::{Datum, Transform};
Expand All @@ -29,7 +31,7 @@ mod truncate;
mod void;

/// TransformFunction is a trait that defines the interface for all transform functions.
pub trait TransformFunction: Send + Sync {
pub trait TransformFunction: Send + Sync + Debug {
/// transform will take an input array and transform it into a new array.
/// The implementation of this function will need to check and downcast the input to specific
/// type.
Expand Down
2 changes: 2 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
pub(crate) mod commit;
pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
pub(crate) mod project;
pub(crate) mod scan;
pub(crate) mod write;

pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

pub use project::project_with_partition;
pub use scan::IcebergTableScan;
Loading
Loading