Skip to content

Commit 6e07faa

Browse files
authored
Metadata table scans as streams (#870)
1 parent e34f428 commit 6e07faa

File tree

5 files changed

+53
-33
lines changed

5 files changed

+53
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/inspect/manifests.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use arrow_array::builder::{
2323
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
2424
use arrow_array::RecordBatch;
2525
use arrow_schema::{DataType, Field, Fields, Schema};
26+
use futures::{stream, StreamExt};
2627

28+
use crate::scan::ArrowRecordBatchStream;
2729
use crate::table::Table;
2830
use crate::Result;
2931

@@ -38,7 +40,7 @@ impl<'a> ManifestsTable<'a> {
3840
Self { table }
3941
}
4042

41-
fn partition_summary_fields(&self) -> Vec<Field> {
43+
fn partition_summary_fields() -> Vec<Field> {
4244
vec![
4345
Field::new("contains_null", DataType::Boolean, false),
4446
Field::new("contains_nan", DataType::Boolean, true),
@@ -65,7 +67,7 @@ impl<'a> ManifestsTable<'a> {
6567
"partition_summaries",
6668
DataType::List(Arc::new(Field::new_struct(
6769
"item",
68-
self.partition_summary_fields(),
70+
Self::partition_summary_fields(),
6971
false,
7072
))),
7173
false,
@@ -74,7 +76,7 @@ impl<'a> ManifestsTable<'a> {
7476
}
7577

7678
/// Scans the manifests table.
77-
pub async fn scan(&self) -> Result<RecordBatch> {
79+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
7880
let mut content = PrimitiveBuilder::<Int8Type>::new();
7981
let mut path = StringBuilder::new();
8082
let mut length = PrimitiveBuilder::<Int64Type>::new();
@@ -87,12 +89,12 @@ impl<'a> ManifestsTable<'a> {
8789
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
8890
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
8991
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
90-
Fields::from(self.partition_summary_fields()),
92+
Fields::from(Self::partition_summary_fields()),
9193
0,
9294
))
9395
.with_field(Arc::new(Field::new_struct(
9496
"item",
95-
self.partition_summary_fields(),
97+
Self::partition_summary_fields(),
9698
false,
9799
)));
98100

@@ -142,7 +144,7 @@ impl<'a> ManifestsTable<'a> {
142144
}
143145
}
144146

145-
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
147+
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
146148
Arc::new(content.finish()),
147149
Arc::new(path.finish()),
148150
Arc::new(length.finish()),
@@ -155,26 +157,28 @@ impl<'a> ManifestsTable<'a> {
155157
Arc::new(existing_delete_files_count.finish()),
156158
Arc::new(deleted_delete_files_count.finish()),
157159
Arc::new(partition_summaries.finish()),
158-
])?)
160+
])?;
161+
162+
Ok(stream::iter(vec![Ok(batch)]).boxed())
159163
}
160164
}
161165

162166
#[cfg(test)]
163167
mod tests {
164168
use expect_test::expect;
165169

166-
use crate::inspect::metadata_table::tests::check_record_batch;
170+
use crate::inspect::metadata_table::tests::check_record_batches;
167171
use crate::scan::tests::TableTestFixture;
168172

169173
#[tokio::test]
170174
async fn test_manifests_table() {
171175
let mut fixture = TableTestFixture::new();
172176
fixture.setup_manifest_files().await;
173177

174-
let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
178+
let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();
175179

176-
check_record_batch(
177-
record_batch,
180+
check_record_batches(
181+
batch_stream,
178182
expect![[r#"
179183
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
180184
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -259,6 +263,6 @@ mod tests {
259263
]"#]],
260264
&["path", "length"],
261265
Some("path"),
262-
);
266+
).await;
263267
}
264268
}

crates/iceberg/src/inspect/metadata_table.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,33 @@ use crate::table::Table;
2525
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
2626
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
2727
#[derive(Debug)]
28-
pub struct MetadataTable(Table);
28+
pub struct MetadataTable<'a>(&'a Table);
2929

30-
impl MetadataTable {
30+
impl<'a> MetadataTable<'a> {
3131
/// Creates a new metadata scan.
32-
pub fn new(table: Table) -> Self {
32+
pub fn new(table: &'a Table) -> Self {
3333
Self(table)
3434
}
3535

3636
/// Get the snapshots table.
3737
pub fn snapshots(&self) -> SnapshotsTable {
38-
SnapshotsTable::new(&self.0)
38+
SnapshotsTable::new(self.0)
3939
}
4040

4141
/// Get the manifests table.
4242
pub fn manifests(&self) -> ManifestsTable {
43-
ManifestsTable::new(&self.0)
43+
ManifestsTable::new(self.0)
4444
}
4545
}
4646

4747
#[cfg(test)]
4848
pub mod tests {
49-
use arrow_array::RecordBatch;
5049
use expect_test::Expect;
50+
use futures::TryStreamExt;
5151
use itertools::Itertools;
5252

53+
use crate::scan::ArrowRecordBatchStream;
54+
5355
/// Snapshot testing to check the resulting record batch.
5456
///
5557
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
@@ -58,13 +60,21 @@ pub mod tests {
5860
/// Check the doc of [`expect_test`] for more details.
5961
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
6062
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
61-
pub fn check_record_batch(
62-
record_batch: RecordBatch,
63+
pub async fn check_record_batches(
64+
batch_stream: ArrowRecordBatchStream,
6365
expected_schema: Expect,
6466
expected_data: Expect,
6567
ignore_check_columns: &[&str],
6668
sort_column: Option<&str>,
6769
) {
70+
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
71+
assert!(!record_batches.is_empty(), "Empty record batches");
72+
73+
// Combine record batches using the first batch's schema
74+
let first_batch = record_batches.first().unwrap();
75+
let record_batch =
76+
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();
77+
6878
let mut columns = record_batch.columns().to_vec();
6979
if let Some(sort_column) = sort_column {
7080
let column = record_batch.column_by_name(sort_column).unwrap();

crates/iceberg/src/inspect/snapshots.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
2121
use arrow_array::types::{Int64Type, TimestampMillisecondType};
2222
use arrow_array::RecordBatch;
2323
use arrow_schema::{DataType, Field, Schema, TimeUnit};
24+
use futures::{stream, StreamExt};
2425

26+
use crate::scan::ArrowRecordBatchStream;
2527
use crate::table::Table;
2628
use crate::Result;
2729

@@ -70,7 +72,7 @@ impl<'a> SnapshotsTable<'a> {
7072
}
7173

7274
/// Scans the snapshots table.
73-
pub fn scan(&self) -> Result<RecordBatch> {
75+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
7476
let mut committed_at =
7577
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
7678
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
@@ -92,30 +94,34 @@ impl<'a> SnapshotsTable<'a> {
9294
summary.append(true)?;
9395
}
9496

95-
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
97+
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
9698
Arc::new(committed_at.finish()),
9799
Arc::new(snapshot_id.finish()),
98100
Arc::new(parent_id.finish()),
99101
Arc::new(operation.finish()),
100102
Arc::new(manifest_list.finish()),
101103
Arc::new(summary.finish()),
102-
])?)
104+
])?;
105+
106+
Ok(stream::iter(vec![Ok(batch)]).boxed())
103107
}
104108
}
105109

106110
#[cfg(test)]
107111
mod tests {
108112
use expect_test::expect;
109113

110-
use crate::inspect::metadata_table::tests::check_record_batch;
114+
use crate::inspect::metadata_table::tests::check_record_batches;
111115
use crate::scan::tests::TableTestFixture;
112116

113-
#[test]
114-
fn test_snapshots_table() {
117+
#[tokio::test]
118+
async fn test_snapshots_table() {
115119
let table = TableTestFixture::new().table;
116-
let record_batch = table.inspect().snapshots().scan().unwrap();
117-
check_record_batch(
118-
record_batch,
120+
121+
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
122+
123+
check_record_batches(
124+
batch_stream,
119125
expect![[r#"
120126
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
121127
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -178,6 +184,6 @@ mod tests {
178184
]"#]],
179185
&["manifest_list"],
180186
Some("committed_at"),
181-
);
187+
).await;
182188
}
183189
}

crates/iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Table {
203203

204204
/// Creates a metadata table which provides table-like APIs for inspecting metadata.
205205
/// See [`MetadataTable`] for more details.
206-
pub fn inspect(self) -> MetadataTable {
206+
pub fn inspect(&self) -> MetadataTable<'_> {
207207
MetadataTable::new(self)
208208
}
209209

0 commit comments

Comments
 (0)