From ae85d633d20efa44d96d8414b8c56cd5d8b024de Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Mon, 27 Oct 2025 12:15:22 +0100 Subject: [PATCH 1/4] propagate metadata column; integrate it for _pos; package iceberg builds at arrow-rs 57.0 --- Cargo.lock | 339 +++++++++++++----- Cargo.toml | 18 +- .../iceberg/src/arrow/delete_file_loader.rs | 1 + crates/iceberg/src/arrow/reader.rs | 53 ++- .../src/expr/visitors/page_index_evaluator.rs | 217 +++++------ .../src/writer/file_writer/parquet_writer.rs | 9 +- 6 files changed, 402 insertions(+), 235 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e27e05129d..de9a35bea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,19 +224,19 @@ version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3f15b4c6b148206ff3a2b35002e08929c2462467b62b9c02036d9c34f9ef994" dependencies = [ - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", + "arrow-arith 55.2.0", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-cast 55.2.0", "arrow-csv", - "arrow-data", - "arrow-ipc", + "arrow-data 55.2.0", + "arrow-ipc 55.2.0", "arrow-json", - "arrow-ord", + "arrow-ord 55.2.0", "arrow-row", - "arrow-schema", - "arrow-select", - "arrow-string", + "arrow-schema 55.2.0", + "arrow-select 55.2.0", + "arrow-string 55.2.0", ] [[package]] @@ -245,14 +245,27 @@ version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30feb679425110209ae35c3fbf82404a39a4c0436bb3ec36164d8bffed2a4ce4" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "chrono", "num", ] +[[package]] +name = "arrow-arith" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "num-traits", +] + [[package]] name = "arrow-array" version = "55.2.0" @@ -260,9 +273,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70732f04d285d49054a48b72c54f791bb3424abae92d27aafdf776c98af161c8" dependencies = [ "ahash 0.8.12", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "chrono", "chrono-tz 0.10.4", "half", @@ -270,6 +283,23 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-array" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "ahash 0.8.12", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "half", + "hashbrown 0.16.0", + "num-complex", + "num-integer", + "num-traits", +] + [[package]] name = "arrow-buffer" version = "55.2.0" @@ -281,17 +311,28 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-buffer" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "bytes", + "half", + "num-bigint", + "num-traits", +] + [[package]] name = "arrow-cast" version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4f12eccc3e1c05a766cafb31f6a60a46c2f8efec9b74c6e0648766d30686af8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", + "arrow-select 55.2.0", "atoi", "base64 0.22.1", "chrono", @@ -302,15 +343,34 @@ dependencies = [ "ryu", ] +[[package]] +name = "arrow-cast" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num-traits", + "ryu", +] + [[package]] name = "arrow-csv" version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "012c9fef3f4a11573b2c74aec53712ff9fdae4a95f4ce452d1bbf088ee00f06b" dependencies = [ - "arrow-array", - "arrow-cast", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-cast 55.2.0", + "arrow-schema 55.2.0", "chrono", "csv", "csv-core", @@ -323,37 +383,62 @@ version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de1ce212d803199684b658fc4ba55fb2d7e87b213de5af415308d2fee3619c2" dependencies = [ - "arrow-buffer", - "arrow-schema", + "arrow-buffer 55.2.0", + "arrow-schema 55.2.0", "half", "num", ] +[[package]] +name = "arrow-data" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-buffer 57.0.0", + "arrow-schema 57.0.0", + "half", + "num-integer", + "num-traits", +] + [[package]] name = "arrow-ipc" version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9ea5967e8b2af39aff5d9de2197df16e305f47f404781d3230b2dc672da5d92" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "flatbuffers", "lz4_flex", ] +[[package]] +name = "arrow-ipc" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "flatbuffers", +] + [[package]] name = "arrow-json" version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5709d974c4ea5be96d900c01576c7c0b99705f4a3eec343648cb1ca863988a9c" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-cast 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "chrono", "half", "indexmap 2.12.0", @@ -371,11 +456,23 @@ version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6506e3a059e3be23023f587f79c82ef0bcf6d293587e3272d20f2d30b969b5a7" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", + "arrow-select 55.2.0", +] + +[[package]] +name = "arrow-ord" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", ] [[package]] @@ -384,10 +481,10 @@ version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52bf7393166beaf79b4bed9bfdf19e97472af32ce5b6b48169d321518a08cae2" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "half", ] @@ -401,6 +498,11 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow-schema" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" + [[package]] name = "arrow-select" version = "55.2.0" @@ -408,30 +510,59 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd2b45757d6a2373faa3352d02ff5b54b098f5e21dccebc45a21806bc34501e5" dependencies = [ "ahash 0.8.12", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", "num", ] +[[package]] +name = "arrow-select" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "ahash 0.8.12", + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "num-traits", +] + [[package]] name = "arrow-string" version = "55.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0377d532850babb4d927a06294314b316e23311503ed580ec6ce6a0158f49d40" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-data 55.2.0", + "arrow-schema 55.2.0", + "arrow-select 55.2.0", "memchr", "num", "regex", "regex-syntax", ] +[[package]] +name = "arrow-string" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "memchr", + "num-traits", + "regex", + "regex-syntax", +] + [[package]] name = "as-any" version = "0.3.2" @@ -1870,8 +2001,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a11e19a7ccc5bb979c95c1dceef663eab39c9061b3bbf8d1937faf0f03bf41f" dependencies = [ "arrow", - "arrow-ipc", - "arrow-schema", + "arrow-ipc 55.2.0", + "arrow-schema 55.2.0", "async-trait", "bytes", "bzip2 0.5.2", @@ -1906,7 +2037,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "parquet", + "parquet 55.2.0", "rand 0.9.2", "regex", "sqlparser", @@ -1985,7 +2116,7 @@ dependencies = [ "mimalloc", "object_store", "parking_lot", - "parquet", + "parquet 55.2.0", "regex", "rustyline", "tokio", @@ -2001,7 +2132,7 @@ dependencies = [ "ahash 0.8.12", "apache-avro 0.17.0", "arrow", - "arrow-ipc", + "arrow-ipc 55.2.0", "base64 0.22.1", "half", "hashbrown 0.14.5", @@ -2009,7 +2140,7 @@ dependencies = [ "libc", "log", "object_store", - "parquet", + "parquet 55.2.0", "paste", "recursive", "sqlparser", @@ -2054,7 +2185,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "parquet", + "parquet 55.2.0", "rand 0.9.2", "tempfile", "tokio", @@ -2165,7 +2296,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "parquet", + "parquet 55.2.0", "rand 0.9.2", "tokio", ] @@ -2236,7 +2367,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdf9a9cf655265861a20453b1e58357147eab59bdc90ce7f2f68f1f35104d3bb" dependencies = [ "arrow", - "arrow-buffer", + "arrow-buffer 55.2.0", "base64 0.22.1", "blake2", "blake3", @@ -2299,7 +2430,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ab331806e34f5545e5f03396e4d5068077395b1665795d8f88c14ec4f1e0b7a" dependencies = [ "arrow", - "arrow-ord", + "arrow-ord 55.2.0", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -2450,8 +2581,8 @@ checksum = "b46cbdf21a01206be76d467f325273b22c559c744a012ead5018dfe79597de08" dependencies = [ "ahash 0.8.12", "arrow", - "arrow-ord", - "arrow-schema", + "arrow-ord 55.2.0", + "arrow-schema 55.2.0", "async-trait", "chrono", "datafusion-common", @@ -3558,14 +3689,14 @@ dependencies = [ "anyhow", "apache-avro 0.20.0", "array-init", - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-ord", - "arrow-schema", - "arrow-select", - "arrow-string", + "arrow-arith 57.0.0", + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-cast 57.0.0", + "arrow-ord 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "arrow-string 57.0.0", "as-any", "async-trait", "backon", @@ -3587,7 +3718,7 @@ dependencies = [ "once_cell", "opendal", "ordered-float 4.6.0", - "parquet", + "parquet 57.0.0", "pretty_assertions", "rand 0.8.5", "regex", @@ -3738,7 +3869,7 @@ dependencies = [ "expect-test", "futures", "iceberg", - "parquet", + "parquet 57.0.0", "tempfile", "tokio", "uuid", @@ -3758,8 +3889,8 @@ dependencies = [ name = "iceberg-integration-tests" version = "0.7.0" dependencies = [ - "arrow-array", - "arrow-schema", + "arrow-array 57.0.0", + "arrow-schema 57.0.0", "ctor", "datafusion", "futures", @@ -3768,7 +3899,7 @@ dependencies = [ "iceberg-datafusion", "iceberg_test_utils", "ordered-float 2.10.1", - "parquet", + "parquet 57.0.0", "tokio", "uuid", ] @@ -3783,6 +3914,7 @@ dependencies = [ "datafusion-cli", "dirs", "fs-err", + "home", "iceberg", "iceberg-catalog-rest", "iceberg-datafusion", @@ -4917,13 +5049,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b17da4150748086bd43352bc77372efa9b6e3dbd06a04831d2a98c041c225cfa" dependencies = [ "ahash 0.8.12", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-ipc", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0", + "arrow-buffer 55.2.0", + "arrow-cast 55.2.0", + "arrow-data 55.2.0", + "arrow-ipc 55.2.0", + "arrow-schema 55.2.0", + "arrow-select 55.2.0", "base64 0.22.1", "brotli", "bytes", @@ -4946,6 +5078,41 @@ dependencies = [ "zstd", ] +[[package]] +name = "parquet" +version = "57.0.0" +source = "git+https://github.com/vustef/arrow-rs?branch=feature%2Fparquet-virtual-row-numbers#6144967b99c077376555f186c1104479abd97934" +dependencies = [ + "ahash 0.8.12", + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-cast 57.0.0", + "arrow-data 57.0.0", + "arrow-ipc 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "base64 0.22.1", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half", + "hashbrown 0.16.0", + "lz4_flex", + "num-bigint", + "num-integer", + "num-traits", + "paste", + "seq-macro", + "simdutf8", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 1c007376ad..2068acf81a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,14 +42,14 @@ rust-version = "1.87" anyhow = "1.0.72" apache-avro = { version = "0.20", features = ["zstandard"] } array-init = "2" -arrow-arith = { version = "55.1" } -arrow-array = { version = "55.1" } -arrow-buffer = { version = "55.1" } -arrow-cast = { version = "55.1" } -arrow-ord = { version = "55.1" } -arrow-schema = { version = "55.1" } -arrow-select = { version = "55.1" } -arrow-string = { version = "55.1" } +arrow-arith = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-array = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-buffer = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-cast = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-ord = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-schema = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-select = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } +arrow-string = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } as-any = "0.3.2" async-trait = "0.1.88" aws-config = "1.8.1" @@ -99,7 +99,7 @@ num-bigint = "0.4.6" once_cell = "1.20" opendal = "0.54.0" ordered-float = "4" -parquet = "55.1" +parquet = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" } pilota = "0.11.10" port_scanner = "0.1.5" pretty_assertions = "1.4" diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..22ab70ba98 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -63,6 +63,7 @@ impl BasicDeleteFileLoader { data_file_path, self.file_io.clone(), false, + vec![], ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ff4cff0a64..46ccd80d7c 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -38,8 +38,8 @@ use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask, RowNumber}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; @@ -57,6 +57,12 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +// Reserved field ID for the row ordinal (_pos) column per Iceberg spec +pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483645; + +/// Column name for the row ordinal metadata column per Iceberg spec +pub(crate) const RESERVED_COL_NAME_POS: &str = "_pos"; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -64,6 +70,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_columns: Vec, } impl ArrowReaderBuilder { @@ -77,6 +84,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + metadata_columns: vec![], } } @@ -105,6 +113,15 @@ impl ArrowReaderBuilder { self } + /// Sets the metadata columns to include in the result + /// + /// Metadata columns are virtual columns that provide metadata about the rows, + /// such as file paths or row positions. These come from https://iceberg.apache.org/spec/#identifier-field-ids + pub fn with_metadata_columns(mut self, metadata_columns: Vec) -> Self { + self.metadata_columns = metadata_columns; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { @@ -117,6 +134,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metadata_columns: self.metadata_columns, } } } @@ -133,6 +151,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_columns: Vec, } impl ArrowReader { @@ -156,6 +175,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + self.metadata_columns.clone(), ) }) .map_err(|err| { @@ -175,16 +195,36 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_columns: Vec, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); + let mut virtual_columns: Vec = vec![]; + for metadata_column in metadata_columns { + if metadata_column == RESERVED_COL_NAME_POS { + let row_number_field = arrow_schema::Field::new(metadata_column, arrow_schema::DataType::Int64, false) + .with_extension_type(RowNumber) + .with_metadata(std::collections::HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_POS.to_string(), + )])); + virtual_columns.push(row_number_field); + } else { + return Err(Error::new( + ErrorKind::FeatureUnsupported, // TODO @vustef: This is appropriate only for columns from iceberg spec. + format!("Metadata column '{}' not supported", metadata_column), + )); + } + } + let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), should_load_page_index, + virtual_columns, ) .await?; @@ -327,6 +367,7 @@ impl ArrowReader { data_file_path: &str, file_io: FileIO, should_load_page_index: bool, + virtual_columns: Vec, ) -> Result>> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -341,7 +382,7 @@ impl ArrowReader { // Create the record batch stream builder, which wraps the parquet file reader let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( parquet_file_reader, - ArrowReaderOptions::new(), + ArrowReaderOptions::new().with_virtual_columns(virtual_columns), ) .await?; Ok(record_batch_stream_builder) @@ -1380,9 +1421,9 @@ impl AsyncFileReader for ArrowFileReader { async move { let reader = ParquetMetaDataReader::new() .with_prefetch_hint(self.metadata_size_hint) - .with_column_indexes(self.preload_column_index) - .with_page_indexes(self.preload_page_index) - .with_offset_indexes(self.preload_offset_index); + .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index)) + .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index)) + .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index)); let size = self.meta.size; let meta = reader.load_and_finish(self, size).await?; diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs index ea56c32c66..bb883e4ab4 100644 --- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -23,7 +23,7 @@ use fnv::FnvHashSet; use ordered_float::OrderedFloat; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::RowGroupMetaData; -use parquet::file::page_index::index::Index; +use parquet::file::page_index::column_index::ColumnIndexMetaData as Index; use parquet::file::page_index::offset_index::OffsetIndexMetaData; use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; @@ -253,117 +253,105 @@ impl<'a> PageIndexEvaluator<'a> { Index::NONE => { return Ok(None); } - Index::BOOLEAN(idx) => idx - .indexes - .iter() + Index::BOOLEAN(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min.map(|val| { + idx.min_value(page_idx).map(|&val| { Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) }), - item.max.map(|val| { + idx.max_value(page_idx).map(|&val| { Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) }), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), - Index::INT32(idx) => idx - .indexes - .iter() + Index::INT32(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), - item.max - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + idx.min_value(page_idx) + .map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), + idx.max_value(page_idx) + .map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), - Index::INT64(idx) => idx - .indexes - .iter() + Index::INT64(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), - item.max - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + idx.min_value(page_idx) + .map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), + idx.max_value(page_idx) + .map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), - Index::FLOAT(idx) => idx - .indexes - .iter() + Index::FLOAT(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min.map(|val| { + idx.min_value(page_idx).map(|&val| { Datum::new( field_type.clone(), PrimitiveLiteral::Float(OrderedFloat::from(val)), ) }), - item.max.map(|val| { + idx.max_value(page_idx).map(|&val| { Datum::new( field_type.clone(), PrimitiveLiteral::Float(OrderedFloat::from(val)), ) }), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), - Index::DOUBLE(idx) => idx - .indexes - .iter() + Index::DOUBLE(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min.map(|val| { + idx.min_value(page_idx).map(|&val| { Datum::new( field_type.clone(), PrimitiveLiteral::Double(OrderedFloat::from(val)), ) }), - item.max.map(|val| { + idx.max_value(page_idx).map(|&val| { Datum::new( field_type.clone(), PrimitiveLiteral::Double(OrderedFloat::from(val)), ) }), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), - Index::BYTE_ARRAY(idx) => idx - .indexes - .iter() + Index::BYTE_ARRAY(idx) => (0..idx.num_pages() as usize) .zip(row_counts.iter()) - .map(|(item, &row_count)| { + .map(|(page_idx, &row_count)| { predicate( - item.min.clone().map(|val| { + idx.min_value(page_idx).map(|val| { Datum::new( field_type.clone(), PrimitiveLiteral::String( - String::from_utf8(val.data().to_vec()).unwrap(), + String::from_utf8(val.to_vec()).unwrap(), ), ) }), - item.max.clone().map(|val| { + idx.max_value(page_idx).map(|val| { Datum::new( field_type.clone(), PrimitiveLiteral::String( - String::from_utf8(val.data().to_vec()).unwrap(), + String::from_utf8(val.to_vec()).unwrap(), ), ) }), - PageNullCount::from_row_and_null_counts(row_count, item.null_count), + PageNullCount::from_row_and_null_counts(row_count, idx.null_count(page_idx)), ) }) .collect(), @@ -791,13 +779,13 @@ mod tests { use std::sync::Arc; use parquet::arrow::arrow_reader::RowSelector; - use parquet::basic::{LogicalType as ParquetLogicalType, Type as ParquetPhysicalType}; + use parquet::basic::{BoundaryOrder, LogicalType as ParquetLogicalType, Type as ParquetPhysicalType}; use parquet::data_type::ByteArray; use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; - use parquet::file::page_index::index::{Index, NativeIndex, PageIndex}; + use parquet::file::page_index::column_index::{ColumnIndexMetaData as Index, PrimitiveColumnIndex as NativeIndex}; use parquet::file::page_index::offset_index::OffsetIndexMetaData; use parquet::file::statistics::Statistics; - use parquet::format::{BoundaryOrder, PageLocation}; + use parquet::format::PageLocation; use parquet::schema::types::{ ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as parquetSchemaType, }; @@ -1316,80 +1304,57 @@ mod tests { } fn create_page_index() -> Result<(Vec, Vec)> { - let idx_float = Index::FLOAT(NativeIndex:: { - indexes: vec![ - PageIndex { - min: None, - max: None, - null_count: Some(1024), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: Some(0.0), - max: Some(10.0), - null_count: Some(0), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: Some(10.0), - max: Some(20.0), - null_count: Some(1), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: None, - max: None, - null_count: None, - repetition_level_histogram: None, - definition_level_histogram: None, - }, - ], - boundary_order: BoundaryOrder(0), // UNORDERED - }); - - let idx_string = Index::BYTE_ARRAY(NativeIndex:: { - indexes: vec![ - PageIndex { - min: Some("AA".into()), - max: Some("DD".into()), - null_count: Some(0), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: Some("DE".into()), - max: Some("DE".into()), - null_count: Some(0), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: Some("DF".into()), - max: Some("UJ".into()), - null_count: Some(1), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: None, - max: None, - null_count: Some(48), - repetition_level_histogram: None, - definition_level_histogram: None, - }, - PageIndex { - min: None, - max: None, - null_count: None, - repetition_level_histogram: None, - definition_level_histogram: None, - }, - ], - boundary_order: BoundaryOrder(0), // UNORDERED - }); + use parquet::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex}; + + // Float index with 4 pages + let float_min_bytes = vec![ + &[] as &[u8], // Page 0: all nulls + &0.0f32.to_le_bytes()[..], // Page 1: min=0.0 + &10.0f32.to_le_bytes()[..], // Page 2: min=10.0 + &[] as &[u8], // Page 3: all nulls + ]; + let float_max_bytes = vec![ + &[] as &[u8], // Page 0: all nulls + &10.0f32.to_le_bytes()[..], // Page 1: max=10.0 + &20.0f32.to_le_bytes()[..], // Page 2: max=20.0 + &[] as &[u8], // Page 3: all nulls + ]; + + let idx_float = Index::FLOAT(PrimitiveColumnIndex::try_new( + vec![true, false, false, true], // null_pages + BoundaryOrder::UNORDERED, + Some(vec![1024, 0, 1, -1]), // null_counts (-1 for unknown) + None, // repetition_level_histograms + None, // definition_level_histograms + float_min_bytes, + float_max_bytes, + ).map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to create float index: {}", e)))?); + + // String index with 5 pages + let string_min_bytes = vec![ + "AA".as_bytes(), // Page 0 + "DE".as_bytes(), // Page 1 + "DF".as_bytes(), // Page 2 + &[] as &[u8], // Page 3: all nulls + &[] as &[u8], // Page 4: all nulls + ]; + let string_max_bytes = vec![ + "DD".as_bytes(), // Page 0 + "DE".as_bytes(), // Page 1 + "UJ".as_bytes(), // Page 2 + &[] as &[u8], // Page 3: all nulls + &[] as &[u8], // Page 4: all nulls + ]; + + let idx_string = Index::BYTE_ARRAY(ByteArrayColumnIndex::try_new( + vec![false, false, false, true, true], // null_pages + BoundaryOrder::UNORDERED, + Some(vec![0, 0, 1, 48, -1]), // null_counts (-1 for unknown) + None, // repetition_level_histograms + None, // definition_level_histograms + string_min_bytes, + string_max_bytes, + ).map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to create string index: {}", e)))?); let page_locs_float = vec![ PageLocation::new(0, 1024, 0), diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 620f27df33..ebf0f29e91 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -567,14 +567,7 @@ impl FileWriter for ParquetWriter { })?; Ok(vec![]) } else { - let parquet_metadata = - Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| { - Error::new( - ErrorKind::Unexpected, - "Failed to convert metadata from thrift to parquet.", - ) - .with_source(err) - })?); + let parquet_metadata = Arc::new(metadata); Ok(vec![Self::parquet_to_data_file_builder( self.schema, From e8215f33591a330e3d74694347cec9ee749383db Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Mon, 27 Oct 2025 14:02:29 +0100 Subject: [PATCH 2/4] scan API update; Fix reader; fix 0.57 arrow-rs integration --- crates/iceberg/src/arrow/reader.rs | 65 +++++++++-- .../src/expr/visitors/page_index_evaluator.rs | 102 +++++++----------- crates/iceberg/src/scan/mod.rs | 68 +++++++++++- 3 files changed, 165 insertions(+), 70 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 46ccd80d7c..6d5898dc7b 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -57,11 +57,11 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; -// Reserved field ID for the row ordinal (_pos) column per Iceberg spec -pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483645; +/// Reserved field ID for the row ordinal (_pos) column per Iceberg spec +pub const RESERVED_FIELD_ID_POS: i32 = 2147483645; /// Column name for the row ordinal metadata column per Iceberg spec -pub(crate) const RESERVED_COL_NAME_POS: &str = "_pos"; +pub const RESERVED_COL_NAME_POS: &str = "_pos"; /// Builder to create ArrowReader pub struct ArrowReaderBuilder { @@ -206,11 +206,11 @@ impl ArrowReader { for metadata_column in metadata_columns { if metadata_column == RESERVED_COL_NAME_POS { let row_number_field = arrow_schema::Field::new(metadata_column, arrow_schema::DataType::Int64, false) - .with_extension_type(RowNumber) .with_metadata(std::collections::HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), RESERVED_FIELD_ID_POS.to_string(), - )])); + )])) + .with_extension_type(RowNumber); virtual_columns.push(row_number_field); } else { return Err(Error::new( @@ -224,10 +224,59 @@ impl ArrowReader { &task.data_file_path, file_io.clone(), should_load_page_index, - virtual_columns, + virtual_columns.clone(), ) .await?; + // Extract field IDs from virtual columns and create Iceberg fields + let mut virtual_iceberg_fields = vec![]; + let mut virtual_field_ids = vec![]; + + for field in &virtual_columns { + let field_id_str = field.metadata().get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| Error::new( + ErrorKind::Unexpected, + format!("Virtual field '{}' missing field ID metadata", field.name()), + ))?; + let field_id = field_id_str.parse::()?; + + // Create an Iceberg NestedField for the virtual column + if field_id == RESERVED_FIELD_ID_POS { + virtual_field_ids.push(field_id); + let iceberg_field = NestedField::required( + field_id, + field.name(), + Type::Primitive(PrimitiveType::Long), + ); + virtual_iceberg_fields.push(Arc::new(iceberg_field)); + } else { + return Err(Error::new( + ErrorKind::FeatureUnsupported, // TODO @vustef: This is appropriate only for columns from iceberg spec. + format!("Field ID '{}' not supported", field_id), + )); + } + } + + // Create an extended schema that includes both regular fields and virtual fields + let extended_schema = if !virtual_iceberg_fields.is_empty() { + let mut all_fields: Vec<_> = task.schema.as_ref().as_struct().fields().iter().cloned().collect(); + all_fields.extend(virtual_iceberg_fields); + + Arc::new( + Schema::builder() + .with_schema_id(task.schema.schema_id()) + .with_fields(all_fields) + .build() + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to build extended schema: {}", e)))? + ) + } else { + task.schema_ref() + }; + + // Combine regular field IDs with virtual field IDs + let mut all_field_ids = task.project_field_ids.clone(); + all_field_ids.extend(virtual_field_ids); + // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response let projection_mask = Self::get_arrow_projection_mask( @@ -242,7 +291,7 @@ impl ArrowReader { // that come back from the file, such as type promotion, default column insertion // and column re-ordering let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformer::build(extended_schema, &all_field_ids); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -1617,7 +1666,7 @@ message schema { assert_eq!(err.kind(), ErrorKind::DataInvalid); assert_eq!( err.to_string(), - "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string() + "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string() ); // Omitting field c2, we still get an error due to c3 being selected diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs index bb883e4ab4..3cf8a5269b 100644 --- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -780,12 +780,10 @@ mod tests { use parquet::arrow::arrow_reader::RowSelector; use parquet::basic::{BoundaryOrder, LogicalType as ParquetLogicalType, Type as ParquetPhysicalType}; - use parquet::data_type::ByteArray; - use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; - use parquet::file::page_index::column_index::{ColumnIndexMetaData as Index, PrimitiveColumnIndex as NativeIndex}; - use parquet::file::page_index::offset_index::OffsetIndexMetaData; + use parquet::file::metadata::{ColumnChunkMetaData, ColumnIndexBuilder, RowGroupMetaData}; + use parquet::file::page_index::column_index::ColumnIndexMetaData as Index; + use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use parquet::file::statistics::Statistics; - use parquet::format::PageLocation; use parquet::schema::types::{ ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as parquetSchemaType, }; @@ -1304,71 +1302,53 @@ mod tests { } fn create_page_index() -> Result<(Vec, Vec)> { - use parquet::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex}; - // Float index with 4 pages - let float_min_bytes = vec![ - &[] as &[u8], // Page 0: all nulls - &0.0f32.to_le_bytes()[..], // Page 1: min=0.0 - &10.0f32.to_le_bytes()[..], // Page 2: min=10.0 - &[] as &[u8], // Page 3: all nulls - ]; - let float_max_bytes = vec![ - &[] as &[u8], // Page 0: all nulls - &10.0f32.to_le_bytes()[..], // Page 1: max=10.0 - &20.0f32.to_le_bytes()[..], // Page 2: max=20.0 - &[] as &[u8], // Page 3: all nulls - ]; + let mut float_builder = ColumnIndexBuilder::new(ParquetPhysicalType::FLOAT); + float_builder.set_boundary_order(BoundaryOrder::UNORDERED); - let idx_float = Index::FLOAT(PrimitiveColumnIndex::try_new( - vec![true, false, false, true], // null_pages - BoundaryOrder::UNORDERED, - Some(vec![1024, 0, 1, -1]), // null_counts (-1 for unknown) - None, // repetition_level_histograms - None, // definition_level_histograms - float_min_bytes, - float_max_bytes, - ).map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to create float index: {}", e)))?); + // Page 0: all nulls + float_builder.append(true, vec![], vec![], 1024); + // Page 1: min=0.0, max=10.0, null_count=0 + float_builder.append(false, 0.0f32.to_le_bytes().to_vec(), 10.0f32.to_le_bytes().to_vec(), 0); + // Page 2: min=10.0, max=20.0, null_count=1 + float_builder.append(false, 10.0f32.to_le_bytes().to_vec(), 20.0f32.to_le_bytes().to_vec(), 1); + // Page 3: all nulls, null_count=-1 (unknown) + float_builder.append(true, vec![], vec![], -1); - // String index with 5 pages - let string_min_bytes = vec![ - "AA".as_bytes(), // Page 0 - "DE".as_bytes(), // Page 1 - "DF".as_bytes(), // Page 2 - &[] as &[u8], // Page 3: all nulls - &[] as &[u8], // Page 4: all nulls - ]; - let string_max_bytes = vec![ - "DD".as_bytes(), // Page 0 - "DE".as_bytes(), // Page 1 - "UJ".as_bytes(), // Page 2 - &[] as &[u8], // Page 3: all nulls - &[] as &[u8], // Page 4: all nulls - ]; + let idx_float = float_builder.build() + .map_err(|e| crate::Error::new(crate::ErrorKind::Unexpected, format!("Failed to create float index: {}", e)))?; - let idx_string = Index::BYTE_ARRAY(ByteArrayColumnIndex::try_new( - vec![false, false, false, true, true], // null_pages - BoundaryOrder::UNORDERED, - Some(vec![0, 0, 1, 48, -1]), // null_counts (-1 for unknown) - None, // repetition_level_histograms - None, // definition_level_histograms - string_min_bytes, - string_max_bytes, - ).map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to create string index: {}", e)))?); + // String index with 5 pages + let mut string_builder = ColumnIndexBuilder::new(ParquetPhysicalType::BYTE_ARRAY); + string_builder.set_boundary_order(BoundaryOrder::UNORDERED); + + // Page 0: "AA" to "DD", null_count=0 + string_builder.append(false, "AA".as_bytes().to_vec(), "DD".as_bytes().to_vec(), 0); + // Page 1: "DE" to "DE", null_count=0 + string_builder.append(false, "DE".as_bytes().to_vec(), "DE".as_bytes().to_vec(), 0); + // Page 2: "DF" to "UJ", null_count=1 + string_builder.append(false, "DF".as_bytes().to_vec(), "UJ".as_bytes().to_vec(), 1); + // Page 3: all nulls, null_count=48 + string_builder.append(true, vec![], vec![], 48); + // Page 4: all nulls, null_count=-1 (unknown) + string_builder.append(true, vec![], vec![], -1); + + let idx_string = string_builder.build() + .map_err(|e| crate::Error::new(crate::ErrorKind::Unexpected, format!("Failed to create string index: {}", e)))?; let page_locs_float = vec![ - PageLocation::new(0, 1024, 0), - PageLocation::new(1024, 1024, 1024), - PageLocation::new(2048, 1024, 2048), - PageLocation::new(3072, 1024, 3072), + PageLocation { offset: 0, compressed_page_size: 1024, first_row_index: 0 }, + PageLocation { offset: 1024, compressed_page_size: 1024, first_row_index: 1024 }, + PageLocation { offset: 2048, compressed_page_size: 1024, first_row_index: 2048 }, + PageLocation { offset: 3072, compressed_page_size: 1024, first_row_index: 3072 }, ]; let page_locs_string = vec![ - PageLocation::new(0, 512, 0), - PageLocation::new(512, 512, 512), - PageLocation::new(1024, 2976, 1024), - PageLocation::new(4000, 48, 4000), - PageLocation::new(4048, 48, 4048), + PageLocation { offset: 0, compressed_page_size: 512, first_row_index: 0 }, + PageLocation { offset: 512, compressed_page_size: 512, first_row_index: 512 }, + PageLocation { offset: 1024, compressed_page_size: 2976, first_row_index: 1024 }, + PageLocation { offset: 4000, compressed_page_size: 48, first_row_index: 4000 }, + PageLocation { offset: 4048, compressed_page_size: 48, first_row_index: 4048 }, ]; Ok((vec![idx_float, idx_string], vec![ diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3d14b3cce4..11cedc90d5 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -31,6 +31,7 @@ use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; +pub use crate::arrow::{RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_POS}; // TODO @vustef: These should rather be defined here or in the spec mod use crate::arrow::ArrowReaderBuilder; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; @@ -59,6 +60,7 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_columns: Vec, } impl<'a> TableScanBuilder<'a> { @@ -77,6 +79,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + metadata_columns: vec![], } } @@ -183,6 +186,14 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the metadata columns to include in the result + /// + /// Metadata columns are virtual columns that provide metadata about the rows, + /// such as file paths or row positions. These come from https://iceberg.apache.org/spec/#identifier-field-ids + pub fn with_metadata_columns(mut self, metadata_columns: Vec) -> Self { + self.metadata_columns = metadata_columns; + self + } /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -209,6 +220,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metadata_columns: self.metadata_columns, }); }; current_snapshot_id.clone() @@ -299,6 +311,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metadata_columns: self.metadata_columns, }) } } @@ -327,6 +340,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_columns: Vec, } impl TableScan { @@ -431,7 +445,8 @@ impl TableScan { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + .with_row_selection_enabled(self.row_selection_enabled) + .with_metadata_columns(self.metadata_columns.clone()); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); @@ -577,6 +592,7 @@ pub mod tests { use tera::{Context, Tera}; use uuid::Uuid; + use super::RESERVED_COL_NAME_POS; use crate::TableIdent; use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; @@ -1303,6 +1319,56 @@ pub mod tests { assert_eq!(int64_arr.value(0), 1); } + #[tokio::test] + async fn test_open_parquet_with_row_numbers() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Create table scan for current snapshot and plan files + let table_scan = fixture + .table + .scan() + .with_metadata_columns(vec![RESERVED_COL_NAME_POS.to_string()]) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have batches + assert!(!batches.is_empty(), "Expected at least one batch"); + + // Check that the _pos column exists + let batch = &batches[0]; + let pos_col = batch + .column_by_name(RESERVED_COL_NAME_POS) + .expect("Expected _pos column to exist"); + + // Verify it's an Int64Array + let pos_arr = pos_col + .as_any() + .downcast_ref::() + .expect("Expected _pos column to be Int64Array"); + + // Verify row numbers start at 0 and are sequential + assert_eq!(pos_arr.len(), batch.num_rows()); + for i in 0..pos_arr.len() { + assert_eq!( + pos_arr.value(i), + i as i64, + "Row number at index {} should be {}", + i, + i + ); + } + + // Also verify the data column still works + let col = batch.column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + } + #[tokio::test] async fn test_open_parquet_no_deletions_by_separate_reader() { let mut fixture = TableTestFixture::new(); From a13f0a8a5ae23ce2725b74471e5f380f8f277c81 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Mon, 27 Oct 2025 14:03:40 +0100 Subject: [PATCH 3/4] fix baselines after 0.57 upgrade --- crates/iceberg/src/inspect/manifests.rs | 24 ++++++++++++------------ crates/iceberg/src/inspect/snapshots.rs | 12 ++++++------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 60854b8bae..d85d9fe834 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -296,18 +296,18 @@ mod tests { check_record_batches( record_batch.try_collect::>().await.unwrap(), expect![[r#" - Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} }, - Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, - Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, - Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, - Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, - Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, - Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }, - Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, - Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} }, - Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} }, - Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} }, - Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]], + Field { "content": Int32, metadata: {"PARQUET:field_id": "14"} }, + Field { "path": Utf8, metadata: {"PARQUET:field_id": "1"} }, + Field { "length": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "partition_spec_id": Int32, metadata: {"PARQUET:field_id": "3"} }, + Field { "added_snapshot_id": Int64, metadata: {"PARQUET:field_id": "4"} }, + Field { "added_data_files_count": Int32, metadata: {"PARQUET:field_id": "5"} }, + Field { "existing_data_files_count": Int32, metadata: {"PARQUET:field_id": "6"} }, + Field { "deleted_data_files_count": Int32, metadata: {"PARQUET:field_id": "7"} }, + Field { "added_delete_files_count": Int32, metadata: {"PARQUET:field_id": "15"} }, + Field { "existing_delete_files_count": Int32, metadata: {"PARQUET:field_id": "16"} }, + Field { "deleted_delete_files_count": Int32, metadata: {"PARQUET:field_id": "17"} }, + Field { "partition_summaries": List(Struct("contains_null": Boolean, metadata: {"PARQUET:field_id": "10"}, "contains_nan": nullable Boolean, metadata: {"PARQUET:field_id": "11"}, "lower_bound": nullable Utf8, metadata: {"PARQUET:field_id": "12"}, "upper_bound": nullable Utf8, metadata: {"PARQUET:field_id": "13"}), metadata: {"PARQUET:field_id": "9"}), metadata: {"PARQUET:field_id": "8"} }"#]], expect![[r#" content: PrimitiveArray [ diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 6081ec165b..37ea684c88 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -151,12 +151,12 @@ mod tests { check_record_batches( batch_stream.try_collect::>().await.unwrap(), expect![[r#" - Field { name: "committed_at", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, - Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, - Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, - Field { name: "operation", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, - Field { name: "manifest_list", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, - Field { name: "summary", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]], + Field { "committed_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} }, + Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "operation": nullable Utf8, metadata: {"PARQUET:field_id": "4"} }, + Field { "manifest_list": nullable Utf8, metadata: {"PARQUET:field_id": "5"} }, + Field { "summary": nullable Map("key_value": Struct("key": Utf8, metadata: {"PARQUET:field_id": "7"}, "value": nullable Utf8, metadata: {"PARQUET:field_id": "8"}), unsorted), metadata: {"PARQUET:field_id": "6"} }"#]], expect![[r#" committed_at: PrimitiveArray [ From ee34c86757cd3e26d8b470332abab381a7542e43 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Mon, 27 Oct 2025 14:04:47 +0100 Subject: [PATCH 4/4] fix 'microseconds' --- crates/iceberg/src/inspect/snapshots.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 37ea684c88..479478b074 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -158,7 +158,7 @@ mod tests { Field { "manifest_list": nullable Utf8, metadata: {"PARQUET:field_id": "5"} }, Field { "summary": nullable Map("key_value": Struct("key": Utf8, metadata: {"PARQUET:field_id": "7"}, "value": nullable Utf8, metadata: {"PARQUET:field_id": "8"}), unsorted), metadata: {"PARQUET:field_id": "6"} }"#]], expect![[r#" - committed_at: PrimitiveArray + committed_at: PrimitiveArray [ 2018-01-04T21:22:35.770+00:00, 2019-04-12T20:29:15.770+00:00,