diff --git a/Cargo.lock b/Cargo.lock index 04265323d3..4e5e796be9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,7 +1374,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -6649,7 +6649,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 7cf82fc823..ab63d2f6ee 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -23,7 +23,9 @@ use arrow_array::builder::{ use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema}; +use futures::{stream, StreamExt}; +use crate::scan::ArrowRecordBatchStream; use crate::table::Table; use crate::Result; @@ -38,7 +40,7 @@ impl<'a> ManifestsTable<'a> { Self { table } } - fn partition_summary_fields(&self) -> Vec { + fn partition_summary_fields() -> Vec { vec![ Field::new("contains_null", DataType::Boolean, false), Field::new("contains_nan", DataType::Boolean, true), @@ -65,7 +67,7 @@ impl<'a> ManifestsTable<'a> { "partition_summaries", DataType::List(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))), false, @@ -74,7 +76,7 @@ impl<'a> ManifestsTable<'a> { } /// Scans the manifests table. - pub async fn scan(&self) -> Result { + pub async fn scan(&self) -> Result { let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); @@ -87,12 +89,12 @@ impl<'a> ManifestsTable<'a> { let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(self.partition_summary_fields()), + Fields::from(Self::partition_summary_fields()), 0, )) .with_field(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))); @@ -142,7 +144,7 @@ impl<'a> ManifestsTable<'a> { } } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ Arc::new(content.finish()), Arc::new(path.finish()), Arc::new(length.finish()), @@ -155,7 +157,9 @@ impl<'a> ManifestsTable<'a> { Arc::new(existing_delete_files_count.finish()), Arc::new(deleted_delete_files_count.finish()), Arc::new(partition_summaries.finish()), - ])?) + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) } } @@ -163,7 +167,7 @@ impl<'a> ManifestsTable<'a> { mod tests { use expect_test::expect; - use crate::inspect::metadata_table::tests::check_record_batch; + use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; #[tokio::test] @@ -171,10 +175,10 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let record_batch = fixture.table.inspect().manifests().scan().await.unwrap(); + let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap(); - check_record_batch( - record_batch, + check_record_batches( + batch_stream, expect![[r#" Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -259,6 +263,6 @@ mod tests { ]"#]], &["path", "length"], Some("path"), - ); + ).await; } } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index 2e6055be85..75dbc74725 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -25,31 +25,33 @@ use crate::table::Table; /// - /// - #[derive(Debug)] -pub struct MetadataTable(Table); +pub struct MetadataTable<'a>(&'a Table); -impl MetadataTable { +impl<'a> MetadataTable<'a> { /// Creates a new metadata scan. - pub fn new(table: Table) -> Self { + pub fn new(table: &'a Table) -> Self { Self(table) } /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable::new(&self.0) + SnapshotsTable::new(self.0) } /// Get the manifests table. pub fn manifests(&self) -> ManifestsTable { - ManifestsTable::new(&self.0) + ManifestsTable::new(self.0) } } #[cfg(test)] pub mod tests { - use arrow_array::RecordBatch; use expect_test::Expect; + use futures::TryStreamExt; use itertools::Itertools; + use crate::scan::ArrowRecordBatchStream; + /// Snapshot testing to check the resulting record batch. /// /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, @@ -58,13 +60,21 @@ pub mod tests { /// Check the doc of [`expect_test`] for more details. /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. - pub fn check_record_batch( - record_batch: RecordBatch, + pub async fn check_record_batches( + batch_stream: ArrowRecordBatchStream, expected_schema: Expect, expected_data: Expect, ignore_check_columns: &[&str], sort_column: Option<&str>, ) { + let record_batches = batch_stream.try_collect::>().await.unwrap(); + assert!(!record_batches.is_empty(), "Empty record batches"); + + // Combine record batches using the first batch's schema + let first_batch = record_batches.first().unwrap(); + let record_batch = + arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap(); + let mut columns = record_batch.columns().to_vec(); if let Some(sort_column) = sort_column { let column = record_batch.column_by_name(sort_column).unwrap(); diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index c2b079dda4..1ee89963d6 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -21,7 +21,9 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use futures::{stream, StreamExt}; +use crate::scan::ArrowRecordBatchStream; use crate::table::Table; use crate::Result; @@ -70,7 +72,7 @@ impl<'a> SnapshotsTable<'a> { } /// Scans the snapshots table. - pub fn scan(&self) -> Result { + pub async fn scan(&self) -> Result { let mut committed_at = PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); @@ -92,14 +94,16 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ Arc::new(committed_at.finish()), Arc::new(snapshot_id.finish()), Arc::new(parent_id.finish()), Arc::new(operation.finish()), Arc::new(manifest_list.finish()), Arc::new(summary.finish()), - ])?) + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) } } @@ -107,15 +111,17 @@ impl<'a> SnapshotsTable<'a> { mod tests { use expect_test::expect; - use crate::inspect::metadata_table::tests::check_record_batch; + use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; - #[test] - fn test_snapshots_table() { + #[tokio::test] + async fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.inspect().snapshots().scan().unwrap(); - check_record_batch( - record_batch, + + let batch_stream = table.inspect().snapshots().scan().await.unwrap(); + + check_record_batches( + batch_stream, expect![[r#" Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -178,6 +184,6 @@ mod tests { ]"#]], &["manifest_list"], Some("committed_at"), - ); + ).await; } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index b53990ed28..ebee670f45 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn inspect(self) -> MetadataTable { + pub fn inspect(&self) -> MetadataTable<'_> { MetadataTable::new(self) }