Skip to content

Commit 186f075

Browse files
committed
feat: support metadata table "snapshots"
Signed-off-by: xxchan <[email protected]>
1 parent 74a85e7 commit 186f075

File tree

8 files changed

+294
-5
lines changed

8 files changed

+294
-5
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,4 @@ volo-thrift = "0.10"
100100
hive_metastore = "0.1"
101101
tera = "1"
102102
zstd = "0.13.2"
103+
expect-test = "1"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,4 @@ pretty_assertions = { workspace = true }
9292
rand = { workspace = true }
9393
tempfile = { workspace = true }
9494
tera = { workspace = true }
95+
expect-test = { workspace = true }

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ mod avro;
7373
pub mod io;
7474
pub mod spec;
7575

76+
pub mod metadata_scan;
7677
pub mod scan;
7778

7879
pub mod expr;
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Metadata table api.
19+
20+
use std::sync::Arc;
21+
22+
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
23+
use arrow_array::types::{Int64Type, TimestampMillisecondType};
24+
use arrow_array::RecordBatch;
25+
use arrow_schema::{DataType, Field, Schema, TimeUnit};
26+
27+
use crate::spec::TableMetadataRef;
28+
use crate::table::Table;
29+
use crate::Result;
30+
31+
/// Table metadata scan.
32+
#[derive(Debug)]
33+
pub struct MetadataScan {
34+
metadata_ref: TableMetadataRef,
35+
}
36+
37+
impl MetadataScan {
38+
/// Creates a new metadata scan.
39+
pub fn new(table: &Table) -> Self {
40+
Self {
41+
metadata_ref: table.metadata_ref(),
42+
}
43+
}
44+
45+
/// Returns the snapshots of the table.
46+
pub fn snapshots(&self) -> Result<RecordBatch> {
47+
SnapshotsTable::scan(self)
48+
}
49+
}
50+
51+
/// References:
52+
/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39>
53+
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
54+
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
55+
pub trait MetadataTable {
56+
/// Returns the schema of the metadata table.
57+
fn schema() -> Schema;
58+
59+
/// Scans the metadata table.
60+
fn scan(scan: &MetadataScan) -> Result<RecordBatch>;
61+
}
62+
63+
/// Snapshots table.
64+
pub struct SnapshotsTable;
65+
66+
impl MetadataTable for SnapshotsTable {
67+
fn schema() -> Schema {
68+
// committed_at: timestamp[ms] not null
69+
// snapshot_id: int64 not null
70+
// parent_id: int64
71+
// operation: string
72+
// manifest_list: string not null
73+
// summary: map<string, string>
74+
// child 0, entries: struct<key: string not null, value: string> not null
75+
// child 0, key: string not null
76+
// child 1, value: string
77+
Schema::new(vec![
78+
Field::new(
79+
"committed_at",
80+
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
81+
false,
82+
),
83+
Field::new("snapshot_id", DataType::Int64, false),
84+
Field::new("parent_id", DataType::Int64, true),
85+
Field::new("operation", DataType::Utf8, false),
86+
Field::new("manifest_list", DataType::Utf8, false),
87+
Field::new(
88+
"summary",
89+
DataType::Map(
90+
Arc::new(Field::new(
91+
"entries",
92+
DataType::Struct(
93+
vec![
94+
Field::new("keys", DataType::Utf8, false),
95+
Field::new("values", DataType::Utf8, true),
96+
]
97+
.into(),
98+
),
99+
false,
100+
)),
101+
false,
102+
),
103+
false,
104+
),
105+
])
106+
}
107+
108+
fn scan(scan: &MetadataScan) -> Result<RecordBatch> {
109+
let mut committed_at =
110+
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
111+
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
112+
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
113+
let mut operation = StringBuilder::new();
114+
let mut manifest_list = StringBuilder::new();
115+
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
116+
117+
for snapshot in scan.metadata_ref.snapshots() {
118+
committed_at.append_value(snapshot.timestamp_ms());
119+
snapshot_id.append_value(snapshot.snapshot_id());
120+
parent_id.append_option(snapshot.parent_snapshot_id());
121+
manifest_list.append_value(snapshot.manifest_list());
122+
operation.append_value(snapshot.summary().operation.as_str());
123+
for (key, value) in &snapshot.summary().additional_properties {
124+
summary.keys().append_value(key);
125+
summary.values().append_value(value);
126+
}
127+
summary.append(true)?;
128+
}
129+
130+
Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![
131+
Arc::new(committed_at.finish()),
132+
Arc::new(snapshot_id.finish()),
133+
Arc::new(parent_id.finish()),
134+
Arc::new(operation.finish()),
135+
Arc::new(manifest_list.finish()),
136+
Arc::new(summary.finish()),
137+
])?)
138+
}
139+
}
140+
141+
#[cfg(test)]
142+
mod tests {
143+
use expect_test::{expect, Expect};
144+
use itertools::Itertools;
145+
146+
use super::*;
147+
use crate::scan::tests::TableTestFixture;
148+
149+
/// Snapshot testing to check the resulting record batch.
150+
///
151+
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
152+
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
153+
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
154+
/// Check the doc of [`expect_test`] for more details.
155+
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
156+
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
157+
fn check_record_batch(
158+
record_batch: RecordBatch,
159+
expected_schema: Expect,
160+
expected_data: Expect,
161+
ignore_check_columns: &[&str],
162+
sort_column: Option<&str>,
163+
) {
164+
let mut columns = record_batch.columns().to_vec();
165+
if let Some(sort_column) = sort_column {
166+
let column = record_batch.column_by_name(sort_column).unwrap();
167+
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
168+
columns = columns
169+
.iter()
170+
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
171+
.collect_vec();
172+
}
173+
174+
expected_schema.assert_eq(&format!(
175+
"{}",
176+
record_batch.schema().fields().iter().format(",\n")
177+
));
178+
expected_data.assert_eq(&format!(
179+
"{}",
180+
record_batch
181+
.schema()
182+
.fields()
183+
.iter()
184+
.zip_eq(columns)
185+
.map(|(field, column)| {
186+
if ignore_check_columns.contains(&field.name().as_str()) {
187+
format!("{}: (skipped)", field.name())
188+
} else {
189+
format!("{}: {:?}", field.name(), column)
190+
}
191+
})
192+
.format(",\n")
193+
));
194+
}
195+
196+
#[test]
197+
fn test_snapshots_table() {
198+
let table = TableTestFixture::new().table;
199+
let scan = MetadataScan::new(&table);
200+
let record_batch = SnapshotsTable::scan(&scan).unwrap();
201+
check_record_batch(
202+
record_batch,
203+
expect![[r#"
204+
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
205+
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
206+
Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
207+
Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
208+
Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
209+
Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
210+
expect![[r#"
211+
committed_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
212+
[
213+
2018-01-04T21:22:35.770+00:00,
214+
2019-04-12T20:29:15.770+00:00,
215+
],
216+
snapshot_id: PrimitiveArray<Int64>
217+
[
218+
3051729675574597004,
219+
3055729675574597004,
220+
],
221+
parent_id: PrimitiveArray<Int64>
222+
[
223+
null,
224+
3051729675574597004,
225+
],
226+
operation: StringArray
227+
[
228+
"append",
229+
"append",
230+
],
231+
manifest_list: (skipped),
232+
summary: MapArray
233+
[
234+
StructArray
235+
-- validity:
236+
[
237+
]
238+
[
239+
-- child 0: "keys" (Utf8)
240+
StringArray
241+
[
242+
]
243+
-- child 1: "values" (Utf8)
244+
StringArray
245+
[
246+
]
247+
],
248+
StructArray
249+
-- validity:
250+
[
251+
]
252+
[
253+
-- child 0: "keys" (Utf8)
254+
StringArray
255+
[
256+
]
257+
-- child 1: "values" (Utf8)
258+
StringArray
259+
[
260+
]
261+
],
262+
]"#]],
263+
&["manifest_list"],
264+
Some("committed_at"),
265+
);
266+
}
267+
}

crates/iceberg/src/scan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ impl FileScanTask {
952952
}
953953

954954
#[cfg(test)]
955-
mod tests {
955+
pub mod tests {
956956
use std::collections::HashMap;
957957
use std::fs;
958958
use std::fs::File;
@@ -981,13 +981,13 @@ mod tests {
981981
use crate::table::Table;
982982
use crate::TableIdent;
983983

984-
struct TableTestFixture {
984+
pub struct TableTestFixture {
985985
table_location: String,
986-
table: Table,
986+
pub table: Table,
987987
}
988988

989989
impl TableTestFixture {
990-
fn new() -> Self {
990+
pub fn new() -> Self {
991991
let tmp_dir = TempDir::new().unwrap();
992992
let table_location = tmp_dir.path().join("table1");
993993
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");

crates/iceberg/src/spec/snapshot.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ pub enum Operation {
5252
Delete,
5353
}
5454

55+
impl Operation {
56+
/// Returns the string representation (lowercase) of the operation.
57+
pub fn as_str(&self) -> &str {
58+
match self {
59+
Operation::Append => "append",
60+
Operation::Replace => "replace",
61+
Operation::Overwrite => "overwrite",
62+
Operation::Delete => "delete",
63+
}
64+
}
65+
}
66+
5567
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
5668
/// Summarises the changes in the snapshot.
5769
pub struct Summary {

crates/iceberg/src/table.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Arc;
2222
use crate::arrow::ArrowReaderBuilder;
2323
use crate::io::object_cache::ObjectCache;
2424
use crate::io::FileIO;
25+
use crate::metadata_scan::MetadataScan;
2526
use crate::scan::TableScanBuilder;
2627
use crate::spec::{TableMetadata, TableMetadataRef};
2728
use crate::{Error, ErrorKind, Result, TableIdent};
@@ -200,6 +201,11 @@ impl Table {
200201
TableScanBuilder::new(self)
201202
}
202203

204+
/// Creates a metadata scan.
205+
pub fn metadata_scan(&self) -> MetadataScan {
206+
MetadataScan::new(self)
207+
}
208+
203209
/// Returns the flag indicating whether the `Table` is readonly or not
204210
pub fn readonly(&self) -> bool {
205211
self.readonly
@@ -292,7 +298,7 @@ impl StaticTable {
292298
}
293299

294300
#[cfg(test)]
295-
mod tests {
301+
mod tests {
296302
use super::*;
297303

298304
#[tokio::test]

crates/test_utils/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod common {
3232
use std::sync::Once;
3333

3434
static INIT: Once = Once::new();
35+
/// Initialize logger, etc.
3536
pub fn set_up() {
3637
INIT.call_once(env_logger::init);
3738
}

0 commit comments

Comments
 (0)