From 66bc85a44031161ecec1af796f84357b33aeaea7 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Fri, 3 Jan 2025 15:02:35 +0100 Subject: [PATCH 1/5] Metadata table scans as streams --- crates/iceberg/src/inspect/manifests.rs | 49 +++++++++++++------- crates/iceberg/src/inspect/metadata_table.rs | 26 +++++++---- crates/iceberg/src/inspect/snapshots.rs | 37 ++++++++++----- crates/iceberg/src/table.rs | 2 +- 4 files changed, 78 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 7cf82fc823..627af00cc9 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -22,8 +22,12 @@ use arrow_array::builder::{ }; use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Fields, Schema}; +use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; +use futures::StreamExt; +use crate::io::FileIO; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -38,7 +42,7 @@ impl<'a> ManifestsTable<'a> { Self { table } } - fn partition_summary_fields(&self) -> Vec { + fn partition_summary_fields() -> Vec { vec![ Field::new("contains_null", DataType::Boolean, false), Field::new("contains_nan", DataType::Boolean, true), @@ -65,7 +69,7 @@ impl<'a> ManifestsTable<'a> { "partition_summaries", DataType::List(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))), false, @@ -74,7 +78,22 @@ impl<'a> ManifestsTable<'a> { } /// Scans the manifests table. - pub async fn scan(&self) -> Result { + pub async fn scan(&self) -> Result { + let arrow_schema = Arc::new(self.schema()); + let table_metadata = self.table.metadata_ref(); + let file_io = self.table.file_io().clone(); + + Ok(futures::stream::once(async move { + Self::build_batch(arrow_schema, &table_metadata, &file_io).await + }) + .boxed()) + } + + async fn build_batch( + arrow_schema: SchemaRef, + table_metadata: &TableMetadata, + file_io: &FileIO, + ) -> Result { let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); @@ -87,19 +106,17 @@ impl<'a> ManifestsTable<'a> { let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(self.partition_summary_fields()), + Fields::from(Self::partition_summary_fields()), 0, )) .with_field(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))); - if let Some(snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; + if let Some(snapshot) = table_metadata.current_snapshot() { + let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i8); path.append_value(manifest.manifest_path.clone()); @@ -142,7 +159,7 @@ impl<'a> ManifestsTable<'a> { } } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Ok(RecordBatch::try_new(arrow_schema, vec![ Arc::new(content.finish()), Arc::new(path.finish()), Arc::new(length.finish()), @@ -163,7 +180,7 @@ impl<'a> ManifestsTable<'a> { mod tests { use expect_test::expect; - use crate::inspect::metadata_table::tests::check_record_batch; + use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; #[tokio::test] @@ -171,10 +188,10 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let record_batch = fixture.table.inspect().manifests().scan().await.unwrap(); + let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap(); - check_record_batch( - record_batch, + check_record_batches( + batch_stream, expect![[r#" Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -259,6 +276,6 @@ mod tests { ]"#]], &["path", "length"], Some("path"), - ); + ).await; } } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index 2e6055be85..75dbc74725 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -25,31 +25,33 @@ use crate::table::Table; /// - /// - #[derive(Debug)] -pub struct MetadataTable(Table); +pub struct MetadataTable<'a>(&'a Table); -impl MetadataTable { +impl<'a> MetadataTable<'a> { /// Creates a new metadata scan. - pub fn new(table: Table) -> Self { + pub fn new(table: &'a Table) -> Self { Self(table) } /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable::new(&self.0) + SnapshotsTable::new(self.0) } /// Get the manifests table. pub fn manifests(&self) -> ManifestsTable { - ManifestsTable::new(&self.0) + ManifestsTable::new(self.0) } } #[cfg(test)] pub mod tests { - use arrow_array::RecordBatch; use expect_test::Expect; + use futures::TryStreamExt; use itertools::Itertools; + use crate::scan::ArrowRecordBatchStream; + /// Snapshot testing to check the resulting record batch. /// /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, @@ -58,13 +60,21 @@ pub mod tests { /// Check the doc of [`expect_test`] for more details. /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. - pub fn check_record_batch( - record_batch: RecordBatch, + pub async fn check_record_batches( + batch_stream: ArrowRecordBatchStream, expected_schema: Expect, expected_data: Expect, ignore_check_columns: &[&str], sort_column: Option<&str>, ) { + let record_batches = batch_stream.try_collect::>().await.unwrap(); + assert!(!record_batches.is_empty(), "Empty record batches"); + + // Combine record batches using the first batch's schema + let first_batch = record_batches.first().unwrap(); + let record_batch = + arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap(); + let mut columns = record_batch.columns().to_vec(); if let Some(sort_column) = sort_column { let column = record_batch.column_by_name(sort_column).unwrap(); diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index c2b079dda4..ee362ef93d 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -20,8 +20,11 @@ use std::sync::Arc; use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use futures::StreamExt; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -70,7 +73,17 @@ impl<'a> SnapshotsTable<'a> { } /// Scans the snapshots table. - pub fn scan(&self) -> Result { + pub async fn scan(&self) -> Result { + let arrow_schema = Arc::new(self.schema()); + let table_metadata = self.table.metadata_ref(); + + Ok( + futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) }) + .boxed(), + ) + } + + fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result { let mut committed_at = PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); @@ -79,7 +92,7 @@ impl<'a> SnapshotsTable<'a> { let mut manifest_list = StringBuilder::new(); let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - for snapshot in self.table.metadata().snapshots() { + for snapshot in table_metadata.snapshots() { committed_at.append_value(snapshot.timestamp_ms()); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); @@ -92,7 +105,7 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Ok(RecordBatch::try_new(arrow_schema, vec![ Arc::new(committed_at.finish()), Arc::new(snapshot_id.finish()), Arc::new(parent_id.finish()), @@ -107,15 +120,17 @@ impl<'a> SnapshotsTable<'a> { mod tests { use expect_test::expect; - use crate::inspect::metadata_table::tests::check_record_batch; + use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; - #[test] - fn test_snapshots_table() { + #[tokio::test] + async fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.inspect().snapshots().scan().unwrap(); - check_record_batch( - record_batch, + + let batch_stream = table.inspect().snapshots().scan().await.unwrap(); + + check_record_batches( + batch_stream, expect![[r#" Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -178,6 +193,6 @@ mod tests { ]"#]], &["manifest_list"], Some("committed_at"), - ); + ).await; } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index b53990ed28..ebee670f45 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn inspect(self) -> MetadataTable { + pub fn inspect(&self) -> MetadataTable<'_> { MetadataTable::new(self) } From 27a50689d8da9c46b1e6efecd28aa1ccee1378d6 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 10:53:52 +0000 Subject: [PATCH 2/5] Move 'build_batch' into 'scan' --- crates/iceberg/src/inspect/snapshots.rs | 67 ++++++++++++------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index ee362ef93d..191a909fe3 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -20,11 +20,10 @@ use std::sync::Arc; use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use futures::StreamExt; use crate::scan::ArrowRecordBatchStream; -use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -77,42 +76,38 @@ impl<'a> SnapshotsTable<'a> { let arrow_schema = Arc::new(self.schema()); let table_metadata = self.table.metadata_ref(); - Ok( - futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) }) - .boxed(), - ) - } - - fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result { - let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); - let mut snapshot_id = PrimitiveBuilder::::new(); - let mut parent_id = PrimitiveBuilder::::new(); - let mut operation = StringBuilder::new(); - let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - - for snapshot in table_metadata.snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); - snapshot_id.append_value(snapshot.snapshot_id()); - parent_id.append_option(snapshot.parent_snapshot_id()); - manifest_list.append_value(snapshot.manifest_list()); - operation.append_value(snapshot.summary().operation.as_str()); - for (key, value) in &snapshot.summary().additional_properties { - summary.keys().append_value(key); - summary.values().append_value(value); + Ok(futures::stream::once(async move { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in table_metadata.snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; } - summary.append(true)?; - } - Ok(RecordBatch::try_new(arrow_schema, vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) + Ok(RecordBatch::try_new(arrow_schema, vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + }) + .boxed()) } } From c7aecedd8665ee6895688e9062c43347df5412e9 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 11:14:08 +0000 Subject: [PATCH 3/5] Use 'try_stream!'? --- Cargo.lock | 27 +++++++++++++++++++++++-- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/inspect/snapshots.rs | 9 +++++---- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04265323d3..df689d9b9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,6 +587,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.92", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1374,7 +1396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2913,6 +2935,7 @@ dependencies = [ "arrow-select", "arrow-string", "async-std", + "async-stream", "async-trait", "bimap", "bitvec", @@ -6649,7 +6672,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 97e77a2c59..26445a9da4 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,6 +52,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } +async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 191a909fe3..90c6ecc229 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -21,6 +21,7 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use async_stream::try_stream; use futures::StreamExt; use crate::scan::ArrowRecordBatchStream; @@ -76,7 +77,7 @@ impl<'a> SnapshotsTable<'a> { let arrow_schema = Arc::new(self.schema()); let table_metadata = self.table.metadata_ref(); - Ok(futures::stream::once(async move { + Ok(try_stream! { let mut committed_at = PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); @@ -98,15 +99,15 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new(arrow_schema, vec![ + yield RecordBatch::try_new(arrow_schema, vec![ Arc::new(committed_at.finish()), Arc::new(snapshot_id.finish()), Arc::new(parent_id.finish()), Arc::new(operation.finish()), Arc::new(manifest_list.finish()), Arc::new(summary.finish()), - ])?) - }) + ])?; + } .boxed()) } } From 82832881f507b9a56e94cf4b22191867b1fadb78 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 11:24:23 +0000 Subject: [PATCH 4/5] try_stream! and inline batch-building for manifests as well --- crates/iceberg/src/inspect/manifests.rs | 167 +++++++++++------------- 1 file changed, 79 insertions(+), 88 deletions(-) diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 627af00cc9..4504668bb4 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -22,12 +22,11 @@ use arrow_array::builder::{ }; use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow_schema::{DataType, Field, Fields, Schema}; +use async_stream::try_stream; use futures::StreamExt; -use crate::io::FileIO; use crate::scan::ArrowRecordBatchStream; -use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -83,96 +82,88 @@ impl<'a> ManifestsTable<'a> { let table_metadata = self.table.metadata_ref(); let file_io = self.table.file_io().clone(); - Ok(futures::stream::once(async move { - Self::build_batch(arrow_schema, &table_metadata, &file_io).await - }) - .boxed()) - } - - async fn build_batch( - arrow_schema: SchemaRef, - table_metadata: &TableMetadata, - file_io: &FileIO, - ) -> Result { - let mut content = PrimitiveBuilder::::new(); - let mut path = StringBuilder::new(); - let mut length = PrimitiveBuilder::::new(); - let mut partition_spec_id = PrimitiveBuilder::::new(); - let mut added_snapshot_id = PrimitiveBuilder::::new(); - let mut added_data_files_count = PrimitiveBuilder::::new(); - let mut existing_data_files_count = PrimitiveBuilder::::new(); - let mut deleted_data_files_count = PrimitiveBuilder::::new(); - let mut added_delete_files_count = PrimitiveBuilder::::new(); - let mut existing_delete_files_count = PrimitiveBuilder::::new(); - let mut deleted_delete_files_count = PrimitiveBuilder::::new(); - let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(Self::partition_summary_fields()), - 0, - )) - .with_field(Arc::new(Field::new_struct( - "item", - Self::partition_summary_fields(), - false, - ))); + Ok(try_stream! { + let mut content = PrimitiveBuilder::::new(); + let mut path = StringBuilder::new(); + let mut length = PrimitiveBuilder::::new(); + let mut partition_spec_id = PrimitiveBuilder::::new(); + let mut added_snapshot_id = PrimitiveBuilder::::new(); + let mut added_data_files_count = PrimitiveBuilder::::new(); + let mut existing_data_files_count = PrimitiveBuilder::::new(); + let mut deleted_data_files_count = PrimitiveBuilder::::new(); + let mut added_delete_files_count = PrimitiveBuilder::::new(); + let mut existing_delete_files_count = PrimitiveBuilder::::new(); + let mut deleted_delete_files_count = PrimitiveBuilder::::new(); + let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(Self::partition_summary_fields()), + 0, + )) + .with_field(Arc::new(Field::new_struct( + "item", + Self::partition_summary_fields(), + false, + ))); - if let Some(snapshot) = table_metadata.current_snapshot() { - let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?; - for manifest in manifest_list.entries() { - content.append_value(manifest.content as i8); - path.append_value(manifest.manifest_path.clone()); - length.append_value(manifest.manifest_length); - partition_spec_id.append_value(manifest.partition_spec_id); - added_snapshot_id.append_value(manifest.added_snapshot_id); - added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); - existing_data_files_count - .append_value(manifest.existing_files_count.unwrap_or(0) as i32); - deleted_data_files_count - .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - added_delete_files_count - .append_value(manifest.added_files_count.unwrap_or(0) as i32); - existing_delete_files_count - .append_value(manifest.existing_files_count.unwrap_or(0) as i32); - deleted_delete_files_count - .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + if let Some(snapshot) = table_metadata.current_snapshot() { + let manifest_list = snapshot.load_manifest_list(&file_io, &table_metadata).await?; + for manifest in manifest_list.entries() { + content.append_value(manifest.content as i8); + path.append_value(manifest.manifest_path.clone()); + length.append_value(manifest.manifest_length); + partition_spec_id.append_value(manifest.partition_spec_id); + added_snapshot_id.append_value(manifest.added_snapshot_id); + added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_data_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_data_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + added_delete_files_count + .append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_delete_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_delete_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - let partition_summaries_builder = partition_summaries.values(); - for summary in &manifest.partitions { - partition_summaries_builder - .field_builder::(0) - .unwrap() - .append_value(summary.contains_null); - partition_summaries_builder - .field_builder::(1) - .unwrap() - .append_option(summary.contains_nan); - partition_summaries_builder - .field_builder::(2) - .unwrap() - .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder - .field_builder::(3) - .unwrap() - .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder.append(true); + let partition_summaries_builder = partition_summaries.values(); + for summary in &manifest.partitions { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder.append(true); + } + partition_summaries.append(true); } - partition_summaries.append(true); } - } - Ok(RecordBatch::try_new(arrow_schema, vec![ - Arc::new(content.finish()), - Arc::new(path.finish()), - Arc::new(length.finish()), - Arc::new(partition_spec_id.finish()), - Arc::new(added_snapshot_id.finish()), - Arc::new(added_data_files_count.finish()), - Arc::new(existing_data_files_count.finish()), - Arc::new(deleted_data_files_count.finish()), - Arc::new(added_delete_files_count.finish()), - Arc::new(existing_delete_files_count.finish()), - Arc::new(deleted_delete_files_count.finish()), - Arc::new(partition_summaries.finish()), - ])?) + yield RecordBatch::try_new(arrow_schema, vec![ + Arc::new(content.finish()), + Arc::new(path.finish()), + Arc::new(length.finish()), + Arc::new(partition_spec_id.finish()), + Arc::new(added_snapshot_id.finish()), + Arc::new(added_data_files_count.finish()), + Arc::new(existing_data_files_count.finish()), + Arc::new(deleted_data_files_count.finish()), + Arc::new(added_delete_files_count.finish()), + Arc::new(existing_delete_files_count.finish()), + Arc::new(deleted_delete_files_count.finish()), + Arc::new(partition_summaries.finish()), + ])?; + } + .boxed()) } } From e67f809c14f335dac8e7b5a98d6f13f1ff4e6fbf Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 14:13:49 +0000 Subject: [PATCH 5/5] Build batches scan before returning stream --- Cargo.lock | 23 ---- crates/iceberg/Cargo.toml | 1 - crates/iceberg/src/inspect/manifests.rs | 162 ++++++++++++------------ crates/iceberg/src/inspect/snapshots.rs | 63 +++++---- 4 files changed, 108 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df689d9b9c..4e5e796be9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,28 +587,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.92", -] - [[package]] name = "async-task" version = "4.7.1" @@ -2935,7 +2913,6 @@ dependencies = [ "arrow-select", "arrow-string", "async-std", - "async-stream", "async-trait", "bimap", "bitvec", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 26445a9da4..97e77a2c59 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,7 +52,6 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } -async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 4504668bb4..ab63d2f6ee 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -23,8 +23,7 @@ use arrow_array::builder::{ use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema}; -use async_stream::try_stream; -use futures::StreamExt; +use futures::{stream, StreamExt}; use crate::scan::ArrowRecordBatchStream; use crate::table::Table; @@ -78,92 +77,89 @@ impl<'a> ManifestsTable<'a> { /// Scans the manifests table. pub async fn scan(&self) -> Result { - let arrow_schema = Arc::new(self.schema()); - let table_metadata = self.table.metadata_ref(); - let file_io = self.table.file_io().clone(); + let mut content = PrimitiveBuilder::::new(); + let mut path = StringBuilder::new(); + let mut length = PrimitiveBuilder::::new(); + let mut partition_spec_id = PrimitiveBuilder::::new(); + let mut added_snapshot_id = PrimitiveBuilder::::new(); + let mut added_data_files_count = PrimitiveBuilder::::new(); + let mut existing_data_files_count = PrimitiveBuilder::::new(); + let mut deleted_data_files_count = PrimitiveBuilder::::new(); + let mut added_delete_files_count = PrimitiveBuilder::::new(); + let mut existing_delete_files_count = PrimitiveBuilder::::new(); + let mut deleted_delete_files_count = PrimitiveBuilder::::new(); + let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(Self::partition_summary_fields()), + 0, + )) + .with_field(Arc::new(Field::new_struct( + "item", + Self::partition_summary_fields(), + false, + ))); - Ok(try_stream! { - let mut content = PrimitiveBuilder::::new(); - let mut path = StringBuilder::new(); - let mut length = PrimitiveBuilder::::new(); - let mut partition_spec_id = PrimitiveBuilder::::new(); - let mut added_snapshot_id = PrimitiveBuilder::::new(); - let mut added_data_files_count = PrimitiveBuilder::::new(); - let mut existing_data_files_count = PrimitiveBuilder::::new(); - let mut deleted_data_files_count = PrimitiveBuilder::::new(); - let mut added_delete_files_count = PrimitiveBuilder::::new(); - let mut existing_delete_files_count = PrimitiveBuilder::::new(); - let mut deleted_delete_files_count = PrimitiveBuilder::::new(); - let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(Self::partition_summary_fields()), - 0, - )) - .with_field(Arc::new(Field::new_struct( - "item", - Self::partition_summary_fields(), - false, - ))); - - if let Some(snapshot) = table_metadata.current_snapshot() { - let manifest_list = snapshot.load_manifest_list(&file_io, &table_metadata).await?; - for manifest in manifest_list.entries() { - content.append_value(manifest.content as i8); - path.append_value(manifest.manifest_path.clone()); - length.append_value(manifest.manifest_length); - partition_spec_id.append_value(manifest.partition_spec_id); - added_snapshot_id.append_value(manifest.added_snapshot_id); - added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); - existing_data_files_count - .append_value(manifest.existing_files_count.unwrap_or(0) as i32); - deleted_data_files_count - .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - added_delete_files_count - .append_value(manifest.added_files_count.unwrap_or(0) as i32); - existing_delete_files_count - .append_value(manifest.existing_files_count.unwrap_or(0) as i32); - deleted_delete_files_count - .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + if let Some(snapshot) = self.table.metadata().current_snapshot() { + let manifest_list = snapshot + .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + .await?; + for manifest in manifest_list.entries() { + content.append_value(manifest.content as i8); + path.append_value(manifest.manifest_path.clone()); + length.append_value(manifest.manifest_length); + partition_spec_id.append_value(manifest.partition_spec_id); + added_snapshot_id.append_value(manifest.added_snapshot_id); + added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_data_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_data_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + added_delete_files_count + .append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_delete_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_delete_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); - let partition_summaries_builder = partition_summaries.values(); - for summary in &manifest.partitions { - partition_summaries_builder - .field_builder::(0) - .unwrap() - .append_value(summary.contains_null); - partition_summaries_builder - .field_builder::(1) - .unwrap() - .append_option(summary.contains_nan); - partition_summaries_builder - .field_builder::(2) - .unwrap() - .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder - .field_builder::(3) - .unwrap() - .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); - partition_summaries_builder.append(true); - } - partition_summaries.append(true); + let partition_summaries_builder = partition_summaries.values(); + for summary in &manifest.partitions { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder.append(true); } + partition_summaries.append(true); } - - yield RecordBatch::try_new(arrow_schema, vec![ - Arc::new(content.finish()), - Arc::new(path.finish()), - Arc::new(length.finish()), - Arc::new(partition_spec_id.finish()), - Arc::new(added_snapshot_id.finish()), - Arc::new(added_data_files_count.finish()), - Arc::new(existing_data_files_count.finish()), - Arc::new(deleted_data_files_count.finish()), - Arc::new(added_delete_files_count.finish()), - Arc::new(existing_delete_files_count.finish()), - Arc::new(deleted_delete_files_count.finish()), - Arc::new(partition_summaries.finish()), - ])?; } - .boxed()) + + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(content.finish()), + Arc::new(path.finish()), + Arc::new(length.finish()), + Arc::new(partition_spec_id.finish()), + Arc::new(added_snapshot_id.finish()), + Arc::new(added_data_files_count.finish()), + Arc::new(existing_data_files_count.finish()), + Arc::new(deleted_data_files_count.finish()), + Arc::new(added_delete_files_count.finish()), + Arc::new(existing_delete_files_count.finish()), + Arc::new(deleted_delete_files_count.finish()), + Arc::new(partition_summaries.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) } } diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 90c6ecc229..1ee89963d6 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -21,8 +21,7 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use async_stream::try_stream; -use futures::StreamExt; +use futures::{stream, StreamExt}; use crate::scan::ArrowRecordBatchStream; use crate::table::Table; @@ -74,41 +73,37 @@ impl<'a> SnapshotsTable<'a> { /// Scans the snapshots table. pub async fn scan(&self) -> Result { - let arrow_schema = Arc::new(self.schema()); - let table_metadata = self.table.metadata_ref(); + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - Ok(try_stream! { - let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); - let mut snapshot_id = PrimitiveBuilder::::new(); - let mut parent_id = PrimitiveBuilder::::new(); - let mut operation = StringBuilder::new(); - let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - - for snapshot in table_metadata.snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); - snapshot_id.append_value(snapshot.snapshot_id()); - parent_id.append_option(snapshot.parent_snapshot_id()); - manifest_list.append_value(snapshot.manifest_list()); - operation.append_value(snapshot.summary().operation.as_str()); - for (key, value) in &snapshot.summary().additional_properties { - summary.keys().append_value(key); - summary.values().append_value(value); - } - summary.append(true)?; + for snapshot in self.table.metadata().snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); } - - yield RecordBatch::try_new(arrow_schema, vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?; + summary.append(true)?; } - .boxed()) + + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) } }