Skip to content

Commit 6e64d88

Browse files
committed
Add History metadata table
1 parent 09fa1fa commit 6e64d88

File tree

1 file changed

+141
-1
lines changed

1 file changed

+141
-1
lines changed

crates/iceberg/src/metadata_scan.rs

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Metadata table api.
1919
20+
use std::collections::HashSet;
2021
use std::sync::Arc;
2122

2223
use arrow_array::builder::{
@@ -26,6 +27,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp
2627
use arrow_array::RecordBatch;
2728
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
2829

30+
use crate::spec::{Snapshot, TableMetadata};
2931
use crate::table::Table;
3032
use crate::Result;
3133

@@ -44,6 +46,11 @@ impl MetadataTable {
4446
Self(table)
4547
}
4648

49+
/// Get the history table.
50+
pub fn history(&self) -> HistoryTable {
51+
HistoryTable { table: &self.0 }
52+
}
53+
4754
/// Get the snapshots table.
4855
pub fn snapshots(&self) -> SnapshotsTable {
4956
SnapshotsTable { table: &self.0 }
@@ -128,6 +135,102 @@ impl<'a> SnapshotsTable<'a> {
128135
}
129136
}
130137

138+
/// History table.
139+
///
140+
/// Shows how the table's current snapshot has changed over time and when each
141+
/// snapshot became the current snapshot.
142+
///
143+
/// Unlike the [Snapshots][SnapshotsTable], this metadata table has less detail
144+
/// per snapshot but includes ancestry information of the current snapshot.
145+
///
146+
/// `is_current_ancestor` indicates whether the snapshot is an ancestor of the
147+
/// current snapshot. If `false`, then the snapshot was rolled back.
148+
pub struct HistoryTable<'a> {
149+
table: &'a Table,
150+
}
151+
152+
impl<'a> HistoryTable<'a> {
153+
/// Return the schema of the history table.
154+
pub fn schema(&self) -> Schema {
155+
Schema::new(vec![
156+
Field::new(
157+
"made_current_at",
158+
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
159+
false,
160+
),
161+
Field::new("snapshot_id", DataType::Int64, false),
162+
Field::new("parent_id", DataType::Int64, true),
163+
Field::new("is_current_ancestor", DataType::Boolean, false),
164+
])
165+
}
166+
167+
/// Scan the history table.
168+
pub fn scan(&self) -> Result<RecordBatch> {
169+
let table_metadata = self.table.metadata();
170+
let ancestors_by_snapshot_id: HashSet<i64> =
171+
SnapshotAncestors::from_current_snapshot(table_metadata)
172+
.map(|snapshot| snapshot.snapshot_id())
173+
.collect();
174+
175+
let mut made_current_at =
176+
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
177+
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
178+
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
179+
let mut is_current_ancestor = BooleanBuilder::new();
180+
181+
for snapshot in table_metadata.snapshots() {
182+
made_current_at.append_value(snapshot.timestamp_ms());
183+
snapshot_id.append_value(snapshot.snapshot_id());
184+
parent_id.append_option(snapshot.parent_snapshot_id());
185+
is_current_ancestor
186+
.append_value(ancestors_by_snapshot_id.contains(&snapshot.snapshot_id()));
187+
}
188+
189+
Ok(RecordBatch::try_new(Arc::new(Self::schema(self)), vec![
190+
Arc::new(made_current_at.finish()),
191+
Arc::new(snapshot_id.finish()),
192+
Arc::new(parent_id.finish()),
193+
Arc::new(is_current_ancestor.finish()),
194+
])?)
195+
}
196+
}
197+
198+
/// Utility to iterate parent-by-parent over the ancestors of a snapshot.
199+
struct SnapshotAncestors<'a> {
200+
table_metadata: &'a TableMetadata,
201+
snapshot: Option<&'a Snapshot>,
202+
}
203+
204+
impl<'a> SnapshotAncestors<'a> {
205+
fn from_current_snapshot(table_metadata: &'a TableMetadata) -> Self {
206+
SnapshotAncestors {
207+
table_metadata,
208+
snapshot: table_metadata.current_snapshot().map(|s| s.as_ref()),
209+
}
210+
}
211+
}
212+
213+
impl<'a> Iterator for SnapshotAncestors<'a> {
214+
type Item = &'a Snapshot;
215+
216+
/// Return the current `snapshot` and move this iterator to the parent snapshot.
217+
fn next(&mut self) -> Option<Self::Item> {
218+
if let Some(snapshot) = self.snapshot {
219+
let parent = match snapshot.parent_snapshot_id() {
220+
Some(parent_snapshot_id) => self
221+
.table_metadata
222+
.snapshot_by_id(parent_snapshot_id)
223+
.map(|s| s.as_ref()),
224+
None => None,
225+
};
226+
self.snapshot = parent;
227+
Some(snapshot)
228+
} else {
229+
None
230+
}
231+
}
232+
}
233+
131234
/// Manifests table.
132235
pub struct ManifestsTable<'a> {
133236
table: &'a Table,
@@ -381,6 +484,43 @@ mod tests {
381484
);
382485
}
383486

487+
#[test]
488+
fn test_history_table() {
489+
let table = TableTestFixture::new().table;
490+
let record_batch = table.metadata_table().history().scan().unwrap();
491+
check_record_batch(
492+
record_batch,
493+
expect![[r#"
494+
Field { name: "made_current_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
495+
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
496+
Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
497+
Field { name: "is_current_ancestor", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
498+
expect![[r#"
499+
made_current_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
500+
[
501+
2018-01-04T21:22:35.770+00:00,
502+
2019-04-12T20:29:15.770+00:00,
503+
],
504+
snapshot_id: PrimitiveArray<Int64>
505+
[
506+
3051729675574597004,
507+
3055729675574597004,
508+
],
509+
parent_id: PrimitiveArray<Int64>
510+
[
511+
null,
512+
3051729675574597004,
513+
],
514+
is_current_ancestor: BooleanArray
515+
[
516+
true,
517+
true,
518+
]"#]],
519+
&[],
520+
Some("made_current_at"),
521+
);
522+
}
523+
384524
#[tokio::test]
385525
async fn test_manifests_table() {
386526
let mut fixture = TableTestFixture::new();
@@ -451,7 +591,7 @@ mod tests {
451591
partition_summaries: ListArray
452592
[
453593
StructArray
454-
-- validity:
594+
-- validity:
455595
[
456596
valid,
457597
]

0 commit comments

Comments
 (0)