From 7408b05d453bd1062f53bcb9364df73e42895245 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:07:35 +0800 Subject: [PATCH 01/16] feat: support metadata table "snapshots" Signed-off-by: xxchan --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/metadata_scan.rs | 267 ++++++++++++++++++++++++++++ crates/iceberg/src/scan.rs | 9 +- crates/iceberg/src/spec/snapshot.rs | 12 ++ crates/iceberg/src/table.rs | 6 + 7 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 crates/iceberg/src/metadata_scan.rs diff --git a/Cargo.toml b/Cargo.toml index b796308be5..5b1dca422d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,3 +101,4 @@ volo-thrift = "0.10" hive_metastore = "0.1" tera = "1" zstd = "0.13.2" +expect-test = "1" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f84e7ab67a..1b236811a0 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -92,3 +92,4 @@ pretty_assertions = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } +expect-test = { workspace = true } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index eaecfea609..1946f35f33 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,6 +73,7 @@ mod avro; pub mod io; pub mod spec; +pub mod metadata_scan; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs new file mode 100644 index 0000000000..6205d6e011 --- /dev/null +++ b/crates/iceberg/src/metadata_scan.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata table api. + +use std::sync::Arc; + +use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; + +use crate::spec::TableMetadataRef; +use crate::table::Table; +use crate::Result; + +/// Table metadata scan. +#[derive(Debug)] +pub struct MetadataScan { + metadata_ref: TableMetadataRef, +} + +impl MetadataScan { + /// Creates a new metadata scan. + pub fn new(table: &Table) -> Self { + Self { + metadata_ref: table.metadata_ref(), + } + } + + /// Returns the snapshots of the table. + pub fn snapshots(&self) -> Result { + SnapshotsTable::scan(self) + } +} + +/// References: +/// - +/// - +/// - +pub trait MetadataTable { + /// Returns the schema of the metadata table. + fn schema() -> Schema; + + /// Scans the metadata table. + fn scan(scan: &MetadataScan) -> Result; +} + +/// Snapshots table. +pub struct SnapshotsTable; + +impl MetadataTable for SnapshotsTable { + fn schema() -> Schema { + // committed_at: timestamp[ms] not null + // snapshot_id: int64 not null + // parent_id: int64 + // operation: string + // manifest_list: string not null + // summary: map + // child 0, entries: struct not null + // child 0, key: string not null + // child 1, value: string + Schema::new(vec![ + Field::new( + "committed_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("operation", DataType::Utf8, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new( + "summary", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + ]) + } + + fn scan(scan: &MetadataScan) -> Result { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in scan.metadata_ref.snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; + } + + Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + } +} + +#[cfg(test)] +mod tests { + use expect_test::{expect, Expect}; + use itertools::Itertools; + + use super::*; + use crate::scan::tests::TableTestFixture; + + /// Snapshot testing to check the resulting record batch. + /// + /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, + /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, + /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). + /// Check the doc of [`expect_test`] for more details. + /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. + fn check_record_batch( + record_batch: RecordBatch, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, + ) { + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); + } + + #[test] + fn test_snapshots_table() { + let table = TableTestFixture::new().table; + let scan = MetadataScan::new(&table); + let record_batch = SnapshotsTable::scan(&scan).unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + operation: StringArray + [ + "append", + "append", + ], + manifest_list: (skipped), + summary: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + ]"#]], + &["manifest_list"], + Some("committed_at"), + ); + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7a100b3467..cb3e5d8c8b 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -961,7 +961,7 @@ impl FileScanTask { } #[cfg(test)] -mod tests { +pub mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; @@ -990,13 +990,14 @@ mod tests { use crate::table::Table; use crate::TableIdent; - struct TableTestFixture { + pub struct TableTestFixture { table_location: String, - table: Table, + pub table: Table, } impl TableTestFixture { - fn new() -> Self { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 81fd6eae66..f24a3c26ba 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -52,6 +52,18 @@ pub enum Operation { Delete, } +impl Operation { + /// Returns the string representation (lowercase) of the operation. + pub fn as_str(&self) -> &str { + match self { + Operation::Append => "append", + Operation::Replace => "replace", + Operation::Overwrite => "overwrite", + Operation::Delete => "delete", + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] /// Summarises the changes in the snapshot. pub struct Summary { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 406f9dd654..6c5823f944 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; +use crate::metadata_scan::MetadataScan; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -200,6 +201,11 @@ impl Table { TableScanBuilder::new(self) } + /// Creates a metadata scan. + pub fn metadata_scan(&self) -> MetadataScan { + MetadataScan::new(self) + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly From 66dc35de8b4784b0bf3c62ce094a23ce09a73fb6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:34:02 +0800 Subject: [PATCH 02/16] improve doc Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 20 +++++++++----------- crates/iceberg/src/table.rs | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 6205d6e011..fa58acfb72 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -29,6 +29,10 @@ use crate::table::Table; use crate::Result; /// Table metadata scan. +/// +/// Used to inspect a table's history, snapshots, and other metadata as a table. +/// +/// See also . #[derive(Debug)] pub struct MetadataScan { metadata_ref: TableMetadataRef, @@ -48,6 +52,10 @@ impl MetadataScan { } } +/// Table metadata scan. +/// +/// Use to inspect a table's history, snapshots, and other metadata as a table. +/// /// References: /// - /// - @@ -65,15 +73,6 @@ pub struct SnapshotsTable; impl MetadataTable for SnapshotsTable { fn schema() -> Schema { - // committed_at: timestamp[ms] not null - // snapshot_id: int64 not null - // parent_id: int64 - // operation: string - // manifest_list: string not null - // summary: map - // child 0, entries: struct not null - // child 0, key: string not null - // child 1, value: string Schema::new(vec![ Field::new( "committed_at", @@ -196,8 +195,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let scan = MetadataScan::new(&table); - let record_batch = SnapshotsTable::scan(&scan).unwrap(); + let record_batch = table.metadata_scan().snapshots(); check_record_batch( record_batch, expect![[r#" diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 6c5823f944..f78e04e2ff 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -201,7 +201,7 @@ impl Table { TableScanBuilder::new(self) } - /// Creates a metadata scan. + /// Creates a metadata scan. See [`MetadataScan`] for more details. pub fn metadata_scan(&self) -> MetadataScan { MetadataScan::new(self) } From fa9d1e3fe26d2e8e88a1c47642b1fc2a309b6758 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:34:45 +0800 Subject: [PATCH 03/16] cargo sort Signed-off-by: xxchan --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 1b236811a0..7f323722f5 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -86,10 +86,10 @@ zstd = { workspace = true } [dev-dependencies] ctor = { workspace = true } +expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } -expect-test = { workspace = true } From f40aa87f15653ec70910eb4c1089a1d6529affe2 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 18 Dec 2024 21:47:38 +0800 Subject: [PATCH 04/16] fix test Signed-off-by: xxchan --- crates/iceberg/src/metadata_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index fa58acfb72..6e47eb796f 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -195,7 +195,7 @@ mod tests { #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.metadata_scan().snapshots(); + let record_batch = table.metadata_scan().snapshots().unwrap(); check_record_batch( record_batch, expect![[r#" From aefea9206f8d555837c1ead7cbdd8154a7c623bb Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 16:10:05 +0800 Subject: [PATCH 05/16] add ManifestsTable --- crates/iceberg/src/metadata_scan.rs | 37 ++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 6e47eb796f..3fbeccfd68 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::spec::TableMetadataRef; use crate::table::Table; @@ -137,6 +137,41 @@ impl MetadataTable for SnapshotsTable { } } +/// Manifests table. +pub struct ManifestsTable; + +impl MetadataTable for ManifestsTable { + fn schema() -> Schema { + Schema::new(vec![ + Field::new("content", DataType::Int8, false), + Field::new("path", DataType::Utf8, false), + Field::new("length", DataType::Int64, false), + Field::new("partition_spec_id", DataType::Int32, false), + Field::new("added_snapshot_id", DataType::Int64, false), + Field::new("added_data_files_count", DataType::Int32, false), + Field::new("existing_data_files_count", DataType::Int32, false), + Field::new("deleted_data_files_count", DataType::Int32, false), + Field::new("added_delete_files_count", DataType::Int32, false), + Field::new("existing_delete_files_count", DataType::Int32, false), + Field::new("deleted_delete_files_count", DataType::Int32, false), + Field::new( + "partition_summaries", + DataType::Struct(Fields::from(vec![ + Field::new("contains_null", DataType::Boolean, false), + Field::new("contains_nan", DataType::Boolean, true), + Field::new("lower_bound", DataType::Utf8, true), + Field::new("upper_bound", DataType::Utf8, true), + ])), + false, + ), + ]) + } + + fn scan(scan: &MetadataScan) -> Result { + Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![])?.into()) + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; From 52d099f94e222621b2811e655ee79465518c4af4 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 18:28:14 +0800 Subject: [PATCH 06/16] add basic scan() impl --- crates/iceberg/src/metadata_scan.rs | 87 +++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 3fbeccfd68..9f7096e907 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -19,12 +19,14 @@ use std::sync::Arc; -use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; -use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::builder::{ + BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, +}; +use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; -use crate::spec::TableMetadataRef; +use crate::spec::{ManifestFile, TableMetadataRef}; use crate::table::Table; use crate::Result; @@ -168,7 +170,84 @@ impl MetadataTable for ManifestsTable { } fn scan(scan: &MetadataScan) -> Result { - Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![])?.into()) + let mut content = PrimitiveBuilder::::new(); + let mut path = StringBuilder::new(); + let mut length = PrimitiveBuilder::::new(); + let mut partition_spec_id = PrimitiveBuilder::::new(); + let mut added_snapshot_id = PrimitiveBuilder::::new(); + let mut added_data_files_count = PrimitiveBuilder::::new(); + let mut existing_data_files_count = PrimitiveBuilder::::new(); + let mut deleted_data_files_count = PrimitiveBuilder::::new(); + let mut added_delete_files_count = PrimitiveBuilder::::new(); + let mut existing_delete_files_count = PrimitiveBuilder::::new(); + let mut deleted_delete_files_count = PrimitiveBuilder::::new(); + let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(vec![ + Field::new("contains_null", DataType::Boolean, false), + Field::new("contains_nan", DataType::Boolean, true), + Field::new("lower_bound", DataType::Utf8, true), + Field::new("upper_bound", DataType::Utf8, true), + ]), + 0, + )); + + if let Some(snapshot) = scan.metadata_ref.current_snapshot() { + // TODO: load manifest list + let manifests = Vec::::new(); + for manifest in manifests { + content.append_value(manifest.content.clone() as i8); + path.append_value(manifest.manifest_path); + length.append_value(manifest.manifest_length); + partition_spec_id.append_value(manifest.partition_spec_id); + added_snapshot_id.append_value(manifest.added_snapshot_id); + added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_data_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_data_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + added_delete_files_count + .append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_delete_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_delete_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + let partition_summaries_builder = partition_summaries.values(); + for summary in manifest.partitions { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + } + partition_summaries_builder.append(true); + } + } + + Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![ + Arc::new(content.finish()), + Arc::new(path.finish()), + Arc::new(length.finish()), + Arc::new(partition_spec_id.finish()), + Arc::new(added_snapshot_id.finish()), + Arc::new(added_data_files_count.finish()), + Arc::new(existing_data_files_count.finish()), + Arc::new(deleted_data_files_count.finish()), + Arc::new(added_delete_files_count.finish()), + Arc::new(existing_delete_files_count.finish()), + Arc::new(deleted_delete_files_count.finish()), + Arc::new(partition_summaries.finish()), + ])?) } } From 7a07c300638586805e8db65c0dfaa54f49a32cc6 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 19:15:26 +0800 Subject: [PATCH 07/16] add io and async --- crates/iceberg/src/metadata_scan.rs | 41 +++++++++++++++++++---------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 9f7096e907..f23268304a 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -25,8 +25,10 @@ use arrow_array::builder::{ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use async_trait::async_trait; -use crate::spec::{ManifestFile, TableMetadataRef}; +use crate::io::FileIO; +use crate::spec::TableMetadataRef; use crate::table::Table; use crate::Result; @@ -38,6 +40,7 @@ use crate::Result; #[derive(Debug)] pub struct MetadataScan { metadata_ref: TableMetadataRef, + io: FileIO, } impl MetadataScan { @@ -45,12 +48,18 @@ impl MetadataScan { pub fn new(table: &Table) -> Self { Self { metadata_ref: table.metadata_ref(), + io: table.file_io().clone(), } } /// Returns the snapshots of the table. - pub fn snapshots(&self) -> Result { - SnapshotsTable::scan(self) + pub async fn snapshots(&self) -> Result { + SnapshotsTable::scan(self).await + } + + /// Returns the manifests of the table. + pub async fn manifests(&self) -> Result { + ManifestsTable::scan(self).await } } @@ -62,17 +71,19 @@ impl MetadataScan { /// - /// - /// - +#[async_trait] pub trait MetadataTable { /// Returns the schema of the metadata table. fn schema() -> Schema; /// Scans the metadata table. - fn scan(scan: &MetadataScan) -> Result; + async fn scan(scan: &MetadataScan) -> Result; } /// Snapshots table. pub struct SnapshotsTable; +#[async_trait] impl MetadataTable for SnapshotsTable { fn schema() -> Schema { Schema::new(vec![ @@ -106,7 +117,7 @@ impl MetadataTable for SnapshotsTable { ]) } - fn scan(scan: &MetadataScan) -> Result { + async fn scan(scan: &MetadataScan) -> Result { let mut committed_at = PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); @@ -142,6 +153,7 @@ impl MetadataTable for SnapshotsTable { /// Manifests table. pub struct ManifestsTable; +#[async_trait] impl MetadataTable for ManifestsTable { fn schema() -> Schema { Schema::new(vec![ @@ -169,7 +181,7 @@ impl MetadataTable for ManifestsTable { ]) } - fn scan(scan: &MetadataScan) -> Result { + async fn scan(scan: &MetadataScan) -> Result { let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); @@ -192,11 +204,12 @@ impl MetadataTable for ManifestsTable { )); if let Some(snapshot) = scan.metadata_ref.current_snapshot() { - // TODO: load manifest list - let manifests = Vec::::new(); - for manifest in manifests { + let manifest_list = snapshot + .load_manifest_list(&scan.io, &scan.metadata_ref) + .await?; + for manifest in manifest_list.entries() { content.append_value(manifest.content.clone() as i8); - path.append_value(manifest.manifest_path); + path.append_value(manifest.manifest_path.clone()); length.append_value(manifest.manifest_length); partition_spec_id.append_value(manifest.partition_spec_id); added_snapshot_id.append_value(manifest.added_snapshot_id); @@ -212,7 +225,7 @@ impl MetadataTable for ManifestsTable { deleted_delete_files_count .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); let partition_summaries_builder = partition_summaries.values(); - for summary in manifest.partitions { + for summary in &manifest.partitions { partition_summaries_builder .field_builder::(0) .unwrap() @@ -306,10 +319,10 @@ mod tests { )); } - #[test] - fn test_snapshots_table() { + #[tokio::test] + async fn test_snapshots_table() { let table = TableTestFixture::new().table; - let record_batch = table.metadata_scan().snapshots().unwrap(); + let record_batch = table.metadata_scan().snapshots().await.unwrap(); check_record_batch( record_batch, expect![[r#" From 382bdf0d7107802771a479ff018b1c1571bd6bf0 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 20:12:22 +0800 Subject: [PATCH 08/16] save --- crates/iceberg/src/metadata_scan.rs | 90 ++++++++++++++++++++++++++--- 1 file changed, 83 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index f23268304a..9c3b60d361 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -170,12 +170,16 @@ impl MetadataTable for ManifestsTable { Field::new("deleted_delete_files_count", DataType::Int32, false), Field::new( "partition_summaries", - DataType::Struct(Fields::from(vec![ - Field::new("contains_null", DataType::Boolean, false), - Field::new("contains_nan", DataType::Boolean, true), - Field::new("lower_bound", DataType::Utf8, true), - Field::new("upper_bound", DataType::Utf8, true), - ])), + DataType::List(Arc::new(Field::new_struct( + "partition_summary", + vec![ + Field::new("contains_null", DataType::Boolean, false), + Field::new("contains_nan", DataType::Boolean, true), + Field::new("lower_bound", DataType::Utf8, true), + Field::new("upper_bound", DataType::Utf8, true), + ], + false, + ))), false, ), ]) @@ -224,6 +228,7 @@ impl MetadataTable for ManifestsTable { .append_value(manifest.existing_files_count.unwrap_or(0) as i32); deleted_delete_files_count .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + let partition_summaries_builder = partition_summaries.values(); for summary in &manifest.partitions { partition_summaries_builder @@ -242,8 +247,9 @@ impl MetadataTable for ManifestsTable { .field_builder::(3) .unwrap() .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder.append(true); } - partition_summaries_builder.append(true); + partition_summaries.append(true); } } @@ -389,4 +395,74 @@ mod tests { Some("committed_at"), ); } + + #[tokio::test] + async fn test_manifests_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let record_batch = fixture.table.metadata_scan().manifests().await.unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + ], + path: StringArray + [ + "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmp8J8I0l/table1/metadata/manifest_bb9125ce-f386-4c12-a99f-64646deff6a7.avro", + ], + length: PrimitiveArray + [ + 3842, + ], + partition_spec_id: PrimitiveArray + [ + 0, + ], + added_snapshot_id: PrimitiveArray + [ + 3055729675574597004, + ], + added_data_files_count: PrimitiveArray + [ + 1, + ], + existing_data_files_count: PrimitiveArray + [ + 1, + ], + deleted_data_files_count: PrimitiveArray + [ + 1, + ], + added_delete_files_count: PrimitiveArray + [ + 1, + ], + existing_delete_files_count: PrimitiveArray + [ + 1, + ], + deleted_delete_files_count: PrimitiveArray + [ + 1, + ]"#]], + &[], + Some("path"), + ); + } } From a7a942e9af5455d8682e3bdc6ed4cdf497ad95bf Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 20:37:08 +0800 Subject: [PATCH 09/16] pass the tests --- crates/iceberg/src/metadata_scan.rs | 69 ++++++++++++++++++++++------- crates/iceberg/src/scan.rs | 2 +- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 9c3b60d361..c5a04b5b8a 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -153,6 +153,17 @@ impl MetadataTable for SnapshotsTable { /// Manifests table. pub struct ManifestsTable; +impl ManifestsTable { + fn partition_summary_fields() -> Vec { + vec![ + Field::new("contains_null", DataType::Boolean, false), + Field::new("contains_nan", DataType::Boolean, true), + Field::new("lower_bound", DataType::Utf8, true), + Field::new("upper_bound", DataType::Utf8, true), + ] + } +} + #[async_trait] impl MetadataTable for ManifestsTable { fn schema() -> Schema { @@ -171,13 +182,8 @@ impl MetadataTable for ManifestsTable { Field::new( "partition_summaries", DataType::List(Arc::new(Field::new_struct( - "partition_summary", - vec![ - Field::new("contains_null", DataType::Boolean, false), - Field::new("contains_nan", DataType::Boolean, true), - Field::new("lower_bound", DataType::Utf8, true), - Field::new("upper_bound", DataType::Utf8, true), - ], + "item", + ManifestsTable::partition_summary_fields(), false, ))), false, @@ -198,14 +204,14 @@ impl MetadataTable for ManifestsTable { let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(vec![ - Field::new("contains_null", DataType::Boolean, false), - Field::new("contains_nan", DataType::Boolean, true), - Field::new("lower_bound", DataType::Utf8, true), - Field::new("upper_bound", DataType::Utf8, true), - ]), + Fields::from(ManifestsTable::partition_summary_fields()), 0, - )); + )) + .with_field(Arc::new(Field::new_struct( + "item", + ManifestsTable::partition_summary_fields(), + false, + ))); if let Some(snapshot) = scan.metadata_ref.current_snapshot() { let manifest_list = snapshot @@ -415,7 +421,8 @@ mod tests { Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], expect![[r#" content: PrimitiveArray [ @@ -423,7 +430,7 @@ mod tests { ], path: StringArray [ - "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmp8J8I0l/table1/metadata/manifest_bb9125ce-f386-4c12-a99f-64646deff6a7.avro", + "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmpTrYrv3/table1/metadata/manifest_baae3b52-80ca-4c50-a85e-667753075ea7.avro", ], length: PrimitiveArray [ @@ -460,6 +467,36 @@ mod tests { deleted_delete_files_count: PrimitiveArray [ 1, + ], + partition_summaries: ListArray + [ + StructArray + -- validity: + [ + valid, + ] + [ + -- child 0: "contains_null" (Boolean) + BooleanArray + [ + false, + ] + -- child 1: "contains_nan" (Boolean) + BooleanArray + [ + false, + ] + -- child 2: "lower_bound" (Utf8) + StringArray + [ + "100", + ] + -- child 3: "upper_bound" (Utf8) + StringArray + [ + "300", + ] + ], ]"#]], &[], Some("path"), diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index cb3e5d8c8b..5a97e74e70 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1050,7 +1050,7 @@ pub mod tests { .unwrap() } - async fn setup_manifest_files(&mut self) { + pub async fn setup_manifest_files(&mut self) { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let parent_snapshot = current_snapshot .parent_snapshot(self.table.metadata()) From 8626d4394e800116e650403bf9a54f3ea8f7152f Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 21:49:56 +0800 Subject: [PATCH 10/16] use deterministic fixture --- crates/iceberg/src/metadata_scan.rs | 5 +++-- crates/iceberg/src/scan.rs | 23 +++++++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index c5a04b5b8a..943b0b7b75 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -404,10 +404,11 @@ mod tests { #[tokio::test] async fn test_manifests_table() { - let mut fixture = TableTestFixture::new(); + let mut fixture = TableTestFixture::new().with_rand_seed(2); fixture.setup_manifest_files().await; let record_batch = fixture.table.metadata_scan().manifests().await.unwrap(); + check_record_batch( record_batch, expect![[r#" @@ -430,7 +431,7 @@ mod tests { ], path: StringArray [ - "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmpTrYrv3/table1/metadata/manifest_baae3b52-80ca-4c50-a85e-667753075ea7.avro", + "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmpRre0Gz/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", ], length: PrimitiveArray [ diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e70..73498c297e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -974,9 +974,10 @@ pub mod tests { use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; use tempfile::TempDir; use tera::{Context, Tera}; - use uuid::Uuid; use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; @@ -992,6 +993,7 @@ pub mod tests { pub struct TableTestFixture { table_location: String, + rand_seed: u64, pub table: Table, } @@ -1033,19 +1035,36 @@ pub mod tests { .build() .unwrap(); + let rand_seed = rand::thread_rng().gen_range(1..=100); Self { table_location: table_location.to_str().unwrap().to_string(), + rand_seed, table, } } + pub fn with_rand_seed(mut self, seed: u64) -> Self { + self.rand_seed = seed; + self + } + + fn rand_uuid(&self) -> String { + let mut seeded_rng = StdRng::seed_from_u64(self.rand_seed); + let rand_buf = seeded_rng.gen::<[u8; 16]>(); + let mut builder = uuid::Builder::from_bytes(rand_buf); + builder + .set_version(uuid::Version::Random) + .as_uuid() + .to_string() + } + fn next_manifest_file(&self) -> OutputFile { self.table .file_io() .new_output(format!( "{}/metadata/manifest_{}.avro", self.table_location, - Uuid::new_v4() + self.rand_uuid() )) .unwrap() } From 49639ed63c2e4826aa56a1597685d19f094eed0e Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 22:17:34 +0800 Subject: [PATCH 11/16] hack expect-test --- crates/iceberg/src/metadata_scan.rs | 12 ++++++++++-- crates/iceberg/src/scan.rs | 23 ++--------------------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 943b0b7b75..378cfbac00 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -278,6 +278,7 @@ impl MetadataTable for ManifestsTable { #[cfg(test)] mod tests { + use arrow_array::StringArray; use expect_test::{expect, Expect}; use itertools::Itertools; @@ -404,10 +405,17 @@ mod tests { #[tokio::test] async fn test_manifests_table() { - let mut fixture = TableTestFixture::new().with_rand_seed(2); + let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; + // the path column is not deterministic, we need to set it to a fixed value to make this expect test possible let record_batch = fixture.table.metadata_scan().manifests().await.unwrap(); + let schema = record_batch.schema(); + let mut columns = record_batch.columns().to_vec(); + columns[1] = Arc::new(StringArray::from(vec![ + "/tmp/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", + ])); + let record_batch = RecordBatch::try_new(schema, columns).unwrap(); check_record_batch( record_batch, @@ -431,7 +439,7 @@ mod tests { ], path: StringArray [ - "/var/folders/tz/9f04ptmx4892t1p2bfjbvkdw0000gn/T/.tmpRre0Gz/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", + "/tmp/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", ], length: PrimitiveArray [ diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 73498c297e..5a97e74e70 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -974,10 +974,9 @@ pub mod tests { use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; - use rand::rngs::StdRng; - use rand::{Rng, SeedableRng}; use tempfile::TempDir; use tera::{Context, Tera}; + use uuid::Uuid; use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; @@ -993,7 +992,6 @@ pub mod tests { pub struct TableTestFixture { table_location: String, - rand_seed: u64, pub table: Table, } @@ -1035,36 +1033,19 @@ pub mod tests { .build() .unwrap(); - let rand_seed = rand::thread_rng().gen_range(1..=100); Self { table_location: table_location.to_str().unwrap().to_string(), - rand_seed, table, } } - pub fn with_rand_seed(mut self, seed: u64) -> Self { - self.rand_seed = seed; - self - } - - fn rand_uuid(&self) -> String { - let mut seeded_rng = StdRng::seed_from_u64(self.rand_seed); - let rand_buf = seeded_rng.gen::<[u8; 16]>(); - let mut builder = uuid::Builder::from_bytes(rand_buf); - builder - .set_version(uuid::Version::Random) - .as_uuid() - .to_string() - } - fn next_manifest_file(&self) -> OutputFile { self.table .file_io() .new_output(format!( "{}/metadata/manifest_{}.avro", self.table_location, - self.rand_uuid() + Uuid::new_v4() )) .unwrap() } From fc780a270c571a7bacdca71a2450255b87619467 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 29 Dec 2024 22:35:01 +0800 Subject: [PATCH 12/16] skip path & length --- crates/iceberg/src/metadata_scan.rs | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 378cfbac00..bc6129860a 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -278,7 +278,6 @@ impl MetadataTable for ManifestsTable { #[cfg(test)] mod tests { - use arrow_array::StringArray; use expect_test::{expect, Expect}; use itertools::Itertools; @@ -408,14 +407,7 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - // the path column is not deterministic, we need to set it to a fixed value to make this expect test possible let record_batch = fixture.table.metadata_scan().manifests().await.unwrap(); - let schema = record_batch.schema(); - let mut columns = record_batch.columns().to_vec(); - columns[1] = Arc::new(StringArray::from(vec![ - "/tmp/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", - ])); - let record_batch = RecordBatch::try_new(schema, columns).unwrap(); check_record_batch( record_batch, @@ -437,14 +429,8 @@ mod tests { [ 0, ], - path: StringArray - [ - "/tmp/table1/metadata/manifest_1fb13e41-bf3b-422c-b258-253d997458e3.avro", - ], - length: PrimitiveArray - [ - 3842, - ], + path: (skipped), + length: (skipped), partition_spec_id: PrimitiveArray [ 0, @@ -507,7 +493,7 @@ mod tests { ] ], ]"#]], - &[], + &["path", "length"], Some("path"), ); } From 0dd66785f190a3d01a2f7fd568bc0926f0df74cd Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 31 Dec 2024 18:32:25 +0800 Subject: [PATCH 13/16] fix clippy --- crates/iceberg/src/metadata_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 287e82f111..a2391af7f9 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -203,7 +203,7 @@ impl<'a> ManifestsTable<'a> { if let Some(snapshot) = self.metadata_table.metadata().current_snapshot() { let manifest_list = snapshot .load_manifest_list( - &self.metadata_table.0.file_io(), + self.metadata_table.0.file_io(), &self.metadata_table.0.metadata_ref(), ) .await?; From 83e88114beef47cc438f90ae4247b3afea904d46 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 1 Jan 2025 13:16:20 +0800 Subject: [PATCH 14/16] exposes schema() --- crates/iceberg/src/metadata_scan.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index a2391af7f9..4227e83127 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -152,7 +152,8 @@ impl<'a> ManifestsTable<'a> { ] } - fn schema(&self) -> Schema { + /// Returns the schema of the manifests table. + pub fn schema(&self) -> Schema { Schema::new(vec![ Field::new("content", DataType::Int8, false), Field::new("path", DataType::Utf8, false), From 4c6e3383cee4d727abcfe7df870d57f723504be5 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 1 Jan 2025 13:17:18 +0800 Subject: [PATCH 15/16] derive Copy in ManifestContentType --- crates/iceberg/src/metadata_scan.rs | 2 +- crates/iceberg/src/spec/manifest_list.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 4227e83127..4737ee8533 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -209,7 +209,7 @@ impl<'a> ManifestsTable<'a> { ) .await?; for manifest in manifest_list.entries() { - content.append_value(manifest.content.clone() as i8); + content.append_value(manifest.content as i8); path.append_value(manifest.manifest_path.clone()); length.append_value(manifest.manifest_length); partition_spec_id.append_value(manifest.partition_spec_id); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 97d259ad35..4618c8a4fa 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -597,7 +597,7 @@ impl ManifestFile { } /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests -#[derive(Debug, PartialEq, Clone, Eq)] +#[derive(Debug, PartialEq, Clone, Copy, Eq)] pub enum ManifestContentType { /// The manifest content is data. Data = 0, From 9fe6bd0fc5d784e24a49a5147ebdadf4f2e22507 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 1 Jan 2025 13:19:55 +0800 Subject: [PATCH 16/16] use .table instead of .metadat_table --- crates/iceberg/src/metadata_scan.rs | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 4737ee8533..16604d781f 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -26,7 +26,6 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; -use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -47,26 +46,18 @@ impl MetadataTable { /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { - metadata_table: self, - } + SnapshotsTable { table: &self.0 } } /// Get the manifests table. pub fn manifests(&self) -> ManifestsTable { - ManifestsTable { - metadata_table: self, - } - } - - fn metadata(&self) -> &TableMetadata { - self.0.metadata() + ManifestsTable { table: &self.0 } } } /// Snapshots table. pub struct SnapshotsTable<'a> { - metadata_table: &'a MetadataTable, + table: &'a Table, } impl<'a> SnapshotsTable<'a> { @@ -113,7 +104,7 @@ impl<'a> SnapshotsTable<'a> { let mut manifest_list = StringBuilder::new(); let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - for snapshot in self.metadata_table.metadata().snapshots() { + for snapshot in self.table.metadata().snapshots() { committed_at.append_value(snapshot.timestamp_ms()); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); @@ -139,7 +130,7 @@ impl<'a> SnapshotsTable<'a> { /// Manifests table. pub struct ManifestsTable<'a> { - metadata_table: &'a MetadataTable, + table: &'a Table, } impl<'a> ManifestsTable<'a> { @@ -201,12 +192,9 @@ impl<'a> ManifestsTable<'a> { false, ))); - if let Some(snapshot) = self.metadata_table.metadata().current_snapshot() { + if let Some(snapshot) = self.table.metadata().current_snapshot() { let manifest_list = snapshot - .load_manifest_list( - self.metadata_table.0.file_io(), - &self.metadata_table.0.metadata_ref(), - ) + .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i8);