From 7408b05d453bd1062f53bcb9364df73e42895245 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:07:35 +0800 Subject: [PATCH 1/8] feat: support metadata table "snapshots" Signed-off-by: xxchan --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/metadata_scan.rs | 267 ++++++++++++++++++++++++++++ crates/iceberg/src/scan.rs | 9 +- crates/iceberg/src/spec/snapshot.rs | 12 ++ crates/iceberg/src/table.rs | 6 + 7 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 crates/iceberg/src/metadata_scan.rs diff --git a/Cargo.toml b/Cargo.toml index b796308be5..5b1dca422d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,3 +101,4 @@ volo-thrift = "0.10" hive_metastore = "0.1" tera = "1" zstd = "0.13.2" +expect-test = "1" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f84e7ab67a..1b236811a0 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -92,3 +92,4 @@ pretty_assertions = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } +expect-test = { workspace = true } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index eaecfea609..1946f35f33 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,6 +73,7 @@ mod avro; pub mod io; pub mod spec; +pub mod metadata_scan; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs new file mode 100644 index 0000000000..6205d6e011 --- /dev/null +++ b/crates/iceberg/src/metadata_scan.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata table api. + +use std::sync::Arc; + +use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; + +use crate::spec::TableMetadataRef; +use crate::table::Table; +use crate::Result; + +/// Table metadata scan. +#[derive(Debug)] +pub struct MetadataScan { + metadata_ref: TableMetadataRef, +} + +impl MetadataScan { + /// Creates a new metadata scan. + pub fn new(table: &Table) -> Self { + Self { + metadata_ref: table.metadata_ref(), + } + } + + /// Returns the snapshots of the table. + pub fn snapshots(&self) -> Result { + SnapshotsTable::scan(self) + } +} + +/// References: +/// - +/// - +/// - +pub trait MetadataTable { + /// Returns the schema of the metadata table. + fn schema() -> Schema; + + /// Scans the metadata table. + fn scan(scan: &MetadataScan) -> Result; +} + +/// Snapshots table. +pub struct SnapshotsTable; + +impl MetadataTable for SnapshotsTable { + fn schema() -> Schema { + // committed_at: timestamp[ms] not null + // snapshot_id: int64 not null + // parent_id: int64 + // operation: string + // manifest_list: string not null + // summary: map + // child 0, entries: struct not null + // child 0, key: string not null + // child 1, value: string + Schema::new(vec![ + Field::new( + "committed_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("operation", DataType::Utf8, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new( + "summary", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + ]) + } + + fn scan(scan: &MetadataScan) -> Result { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in scan.metadata_ref.snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; + } + + Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + } +} + +#[cfg(test)] +mod tests { + use expect_test::{expect, Expect}; + use itertools::Itertools; + + use super::*; + use crate::scan::tests::TableTestFixture; + + /// Snapshot testing to check the resulting record batch. + /// + /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, + /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, + /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). + /// Check the doc of [`expect_test`] for more details. + /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. + fn check_record_batch( + record_batch: RecordBatch, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, + ) { + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); + } + + #[test] + fn test_snapshots_table() { + let table = TableTestFixture::new().table; + let scan = MetadataScan::new(&table); + let record_batch = SnapshotsTable::scan(&scan).unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + operation: StringArray + [ + "append", + "append", + ], + manifest_list: (skipped), + summary: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + ]"#]], + &["manifest_list"], + Some("committed_at"), + ); + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7a100b3467..cb3e5d8c8b 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -961,7 +961,7 @@ impl FileScanTask { } #[cfg(test)] -mod tests { +pub mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; @@ -990,13 +990,14 @@ mod tests { use crate::table::Table; use crate::TableIdent; - struct TableTestFixture { + pub struct TableTestFixture { table_location: String, - table: Table, + pub table: Table, } impl TableTestFixture { - fn new() -> Self { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 81fd6eae66..f24a3c26ba 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -52,6 +52,18 @@ pub enum Operation { Delete, } +impl Operation { + /// Returns the string representation (lowercase) of the operation. + pub fn as_str(&self) -> &str { + match self { + Operation::Append => "append", + Operation::Replace => "replace", + Operation::Overwrite => "overwrite", + Operation::Delete => "delete", + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] /// Summarises the changes in the snapshot. pub struct Summary { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 406f9dd654..6c5823f944 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; +use crate::metadata_scan::MetadataScan; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -200,6 +201,11 @@ impl Table { TableScanBuilder::new(self) } + /// Creates a metadata scan. + pub fn metadata_scan(&self) -> MetadataScan { + MetadataScan::new(self) + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly From 66dc35de8b4784b0bf3c62ce094a23ce09a73fb6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:34:02 +0800 Subject: [PATCH 2/8] improve doc Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 20 +++++++++----------- crates/iceberg/src/table.rs | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 6205d6e011..fa58acfb72 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -29,6 +29,10 @@ use crate::table::Table; use crate::Result; /// Table metadata scan. +/// +/// Used to inspect a table's history, snapshots, and other metadata as a table. +/// +/// See also . #[derive(Debug)] pub struct MetadataScan { metadata_ref: TableMetadataRef, @@ -48,6 +52,10 @@ impl MetadataScan { } } +/// Table metadata scan. +/// +/// Use to inspect a table's history, snapshots, and other metadata as a table. +/// /// References: /// - /// - @@ -65,15 +73,6 @@ pub struct SnapshotsTable; impl MetadataTable for SnapshotsTable { fn schema() -> Schema { - // committed_at: timestamp[ms] not null - // snapshot_id: int64 not null - // parent_id: int64 - // operation: string - // manifest_list: string not null - // summary: map - // child 0, entries: struct not null - // child 0, key: string not null - // child 1, value: string Schema::new(vec![ Field::new( "committed_at", @@ -196,8 +195,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let scan = MetadataScan::new(&table); - let record_batch = SnapshotsTable::scan(&scan).unwrap(); + let record_batch = table.metadata_scan().snapshots(); check_record_batch( record_batch, expect![[r#" diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 6c5823f944..f78e04e2ff 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -201,7 +201,7 @@ impl Table { TableScanBuilder::new(self) } - /// Creates a metadata scan. + /// Creates a metadata scan. See [`MetadataScan`] for more details. pub fn metadata_scan(&self) -> MetadataScan { MetadataScan::new(self) } From fa9d1e3fe26d2e8e88a1c47642b1fc2a309b6758 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:34:45 +0800 Subject: [PATCH 3/8] cargo sort Signed-off-by: xxchan --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 1b236811a0..7f323722f5 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -86,10 +86,10 @@ zstd = { workspace = true } [dev-dependencies] ctor = { workspace = true } +expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } -expect-test = { workspace = true } From f40aa87f15653ec70910eb4c1089a1d6529affe2 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:47:38 +0800 Subject: [PATCH 4/8] fix test Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index fa58acfb72..6e47eb796f 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -195,7 +195,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.metadata_scan().snapshots(); + let record_batch = table.metadata_scan().snapshots().unwrap(); check_record_batch( record_batch, expect![[r#" From d32c2e60f3fb8122b70ad6e90ae5e6b9e765c29f Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 30 Dec 2024 20:52:10 +0800 Subject: [PATCH 5/8] redesign interface Signed-off-by: xxchan --- Cargo.lock | 17 ++++++++ crates/iceberg/src/metadata_scan.rs | 62 +++++++++++++---------------- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ddb0c77b83..2c2dc82600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2135,6 +2135,12 @@ dependencies = [ "syn 2.0.92", ] +[[package]] +name = "dissimilar" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d" + [[package]] name = "dlv-list" version = "0.5.2" @@ -2236,6 +2242,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "expect-test" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63af43ff4431e848fb47472a920f14fa71c24de13255a5692e93d4e90302acb0" +dependencies = [ + "dissimilar", + "once_cell", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2868,6 +2884,7 @@ dependencies = [ "chrono", "ctor", "derive_builder", + "expect-test", "fnv", "futures", "iceberg-catalog-memory", diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 6e47eb796f..21e954ab81 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -32,7 +32,10 @@ use crate::Result; /// /// Used to inspect a table's history, snapshots, and other metadata as a table. /// -/// See also . +/// References: +/// - +/// - +/// - #[derive(Debug)] pub struct MetadataScan { metadata_ref: TableMetadataRef, @@ -46,33 +49,20 @@ impl MetadataScan { } } - /// Returns the snapshots of the table. - pub fn snapshots(&self) -> Result { - SnapshotsTable::scan(self) + /// Get the snapshots table. + pub fn snapshots(&self) -> SnapshotsTable { + SnapshotsTable { metadata: self } } } -/// Table metadata scan. -/// -/// Use to inspect a table's history, snapshots, and other metadata as a table. -/// -/// References: -/// - -/// - -/// - -pub trait MetadataTable { - /// Returns the schema of the metadata table. - fn schema() -> Schema; - - /// Scans the metadata table. - fn scan(scan: &MetadataScan) -> Result; -} - /// Snapshots table. -pub struct SnapshotsTable; +pub struct SnapshotsTable<'a> { + metadata: &'a MetadataScan, +} -impl MetadataTable for SnapshotsTable { - fn schema() -> Schema { +impl<'a> SnapshotsTable<'a> { + /// Returns the schema of the snapshots table. + pub fn schema(&self) -> Schema { Schema::new(vec![ Field::new( "committed_at", @@ -104,7 +94,8 @@ impl MetadataTable for SnapshotsTable { ]) } - fn scan(scan: &MetadataScan) -> Result { + /// Scans the snapshots table. + pub fn scan(&self) -> Result { let mut committed_at = PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); @@ -113,7 +104,7 @@ impl MetadataTable for SnapshotsTable { let mut manifest_list = StringBuilder::new(); let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - for snapshot in scan.metadata_ref.snapshots() { + for snapshot in self.metadata.metadata_ref.snapshots() { committed_at.append_value(snapshot.timestamp_ms()); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); @@ -126,14 +117,17 @@ impl MetadataTable for SnapshotsTable { summary.append(true)?; } - Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) + Ok(RecordBatch::try_new( + Arc::new(self.schema()), + vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ], + )?) } } @@ -195,7 +189,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.metadata_scan().snapshots().unwrap(); + let record_batch = table.metadata_scan().snapshots().scan().unwrap(); check_record_batch( record_batch, expect![[r#" From c2b329b820ef0f02bb0a48c36a363b605011698e Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 30 Dec 2024 21:12:22 +0800 Subject: [PATCH 6/8] fmt Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 21e954ab81..c7445e748d 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -117,17 +117,14 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new( - Arc::new(self.schema()), - vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ], - )?) + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) } } From f7b5923c26c8d865a442f9c1849e59ef945e7389 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 30 Dec 2024 22:41:53 +0800 Subject: [PATCH 7/8] resolve comments Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 51 +++++++++++++++-------------- crates/iceberg/src/table.rs | 9 ++--- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index c7445e748d..ed5cd34c63 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -24,40 +24,40 @@ use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use crate::spec::TableMetadataRef; +use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; -/// Table metadata scan. -/// -/// Used to inspect a table's history, snapshots, and other metadata as a table. +/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. /// /// References: /// - /// - /// - #[derive(Debug)] -pub struct MetadataScan { - metadata_ref: TableMetadataRef, -} +pub struct MetadataTable(Table); -impl MetadataScan { +impl MetadataTable { /// Creates a new metadata scan. - pub fn new(table: &Table) -> Self { - Self { - metadata_ref: table.metadata_ref(), - } + pub(super) fn new(table: Table) -> Self { + Self(table) } /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { metadata: self } + SnapshotsTable { + metadata_table: self, + } + } + + fn metadata(&self) -> &TableMetadata { + self.0.metadata() } } /// Snapshots table. pub struct SnapshotsTable<'a> { - metadata: &'a MetadataScan, + metadata_table: &'a MetadataTable, } impl<'a> SnapshotsTable<'a> { @@ -104,7 +104,7 @@ impl<'a> SnapshotsTable<'a> { let mut manifest_list = StringBuilder::new(); let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - for snapshot in self.metadata.metadata_ref.snapshots() { + for snapshot in self.metadata_table.metadata().snapshots() { committed_at.append_value(snapshot.timestamp_ms()); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); @@ -117,14 +117,17 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) + Ok(RecordBatch::try_new( + Arc::new(self.schema()), + vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ], + )?) } } @@ -186,7 +189,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.metadata_scan().snapshots().scan().unwrap(); + let record_batch = table.metadata_table().snapshots().scan().unwrap(); check_record_batch( record_batch, expect![[r#" diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index f78e04e2ff..fa53048559 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; -use crate::metadata_scan::MetadataScan; +use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -201,9 +201,10 @@ impl Table { TableScanBuilder::new(self) } - /// Creates a metadata scan. See [`MetadataScan`] for more details. - pub fn metadata_scan(&self) -> MetadataScan { - MetadataScan::new(self) + /// Creates a metadata table which provides table-like APIs for inspecting metadata. + /// See [`MetadataTable`] for more details. + pub fn metadata_table(self) -> MetadataTable { + MetadataTable::new(self) } /// Returns the flag indicating whether the `Table` is readonly or not From a83c2aebde909fb13538d1dfb8cce76a506a825a Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 30 Dec 2024 22:43:19 +0800 Subject: [PATCH 8/8] fmt Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index ed5cd34c63..942d7605c3 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -117,17 +117,14 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - Ok(RecordBatch::try_new( - Arc::new(self.schema()), - vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ], - )?) + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) } }