1717
1818//! Metadata table api.
1919
20+ use std:: collections:: HashSet ;
2021use std:: sync:: Arc ;
2122
22- use arrow_array:: builder:: { MapBuilder , PrimitiveBuilder , StringBuilder } ;
23+ use arrow_array:: builder:: { BooleanBuilder , MapBuilder , PrimitiveBuilder , StringBuilder } ;
2324use arrow_array:: types:: { Int64Type , TimestampMillisecondType } ;
2425use arrow_array:: RecordBatch ;
2526use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
2627
27- use crate :: spec:: TableMetadata ;
28+ use crate :: spec:: { SnapshotRef , TableMetadata } ;
2829use crate :: table:: Table ;
2930use crate :: Result ;
3031
@@ -50,6 +51,13 @@ impl MetadataTable {
5051 }
5152 }
5253
54+ /// Get the history table.
55+ pub fn history ( & self ) -> HistoryTable {
56+ HistoryTable {
57+ metadata_table : self ,
58+ }
59+ }
60+
5361 fn metadata ( & self ) -> & TableMetadata {
5462 self . 0 . metadata ( )
5563 }
@@ -128,6 +136,99 @@ impl<'a> SnapshotsTable<'a> {
128136 }
129137}
130138
139+ /// History table.
140+ ///
141+ /// Shows how the table's current snapshot has changed over time and when each
142+ /// snapshot became the current snapshot.
143+ ///
144+ /// Unlike the [Snapshots][SnapshotsTable], this metadata table has less detail
145+ /// per snapshot but includes ancestry information of the current snapshot.
146+ ///
147+ /// `is_current_ancestor` indicates whether the snapshot is an ancestor of the
148+ /// current snapshot. If `false`, then the snapshot was rolled back.
149+ pub struct HistoryTable < ' a > {
150+ metadata_table : & ' a MetadataTable ,
151+ }
152+
153+ impl < ' a > HistoryTable < ' a > {
154+ /// Return the schema of the history table.
155+ pub fn schema ( & self ) -> Schema {
156+ Schema :: new ( vec ! [
157+ Field :: new(
158+ "made_current_at" ,
159+ DataType :: Timestamp ( TimeUnit :: Millisecond , Some ( "+00:00" . into( ) ) ) ,
160+ false ,
161+ ) ,
162+ Field :: new( "snapshot_id" , DataType :: Int64 , false ) ,
163+ Field :: new( "parent_id" , DataType :: Int64 , true ) ,
164+ Field :: new( "is_current_ancestor" , DataType :: Boolean , false ) ,
165+ ] )
166+ }
167+
168+ /// Scan the history table.
169+ pub fn scan ( & self ) -> Result < RecordBatch > {
170+ let table_metadata = self . metadata_table . metadata ( ) ;
171+ let ancestors_by_snapshot_id: HashSet < i64 > =
172+ SnapshotAncestors :: from_current_snapshot ( table_metadata)
173+ . map ( |snapshot| snapshot. snapshot_id ( ) )
174+ . collect ( ) ;
175+
176+ let mut made_current_at =
177+ PrimitiveBuilder :: < TimestampMillisecondType > :: new ( ) . with_timezone ( "+00:00" ) ;
178+ let mut snapshot_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
179+ let mut parent_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
180+ let mut is_current_ancestor = BooleanBuilder :: new ( ) ;
181+
182+ for snapshot in table_metadata. snapshots ( ) {
183+ made_current_at. append_value ( snapshot. timestamp_ms ( ) ) ;
184+ snapshot_id. append_value ( snapshot. snapshot_id ( ) ) ;
185+ parent_id. append_option ( snapshot. parent_snapshot_id ( ) ) ;
186+ is_current_ancestor
187+ . append_value ( ancestors_by_snapshot_id. contains ( & snapshot. snapshot_id ( ) ) ) ;
188+ }
189+
190+ Ok ( RecordBatch :: try_new ( Arc :: new ( Self :: schema ( self ) ) , vec ! [
191+ Arc :: new( made_current_at. finish( ) ) ,
192+ Arc :: new( snapshot_id. finish( ) ) ,
193+ Arc :: new( parent_id. finish( ) ) ,
194+ Arc :: new( is_current_ancestor. finish( ) ) ,
195+ ] ) ?)
196+ }
197+ }
198+
199+ /// Utility to iterate parent-by-parent over the ancestors of a snapshot.
200+ struct SnapshotAncestors < ' a > {
201+ table_metadata : & ' a TableMetadata ,
202+ snapshot : Option < & ' a SnapshotRef > ,
203+ }
204+
205+ impl < ' a > SnapshotAncestors < ' a > {
206+ fn from_current_snapshot ( table_metadata : & ' a TableMetadata ) -> Self {
207+ SnapshotAncestors {
208+ table_metadata,
209+ snapshot : table_metadata. current_snapshot ( ) ,
210+ }
211+ }
212+ }
213+
214+ impl < ' a > Iterator for SnapshotAncestors < ' a > {
215+ type Item = & ' a SnapshotRef ;
216+
217+ /// Return the current `snapshot` and move this iterator to the parent snapshot.
218+ fn next ( & mut self ) -> Option < Self :: Item > {
219+ if let Some ( snapshot) = self . snapshot {
220+ let parent = match snapshot. parent_snapshot_id ( ) {
221+ Some ( parent_snapshot_id) => self . table_metadata . snapshot_by_id ( parent_snapshot_id) ,
222+ None => None ,
223+ } ;
224+ self . snapshot = parent;
225+ Some ( snapshot)
226+ } else {
227+ None
228+ }
229+ }
230+ }
231+
131232#[ cfg( test) ]
132233mod tests {
133234 use expect_test:: { expect, Expect } ;
@@ -253,4 +354,41 @@ mod tests {
253354 Some ( "committed_at" ) ,
254355 ) ;
255356 }
357+
358+ #[ test]
359+ fn test_history_table ( ) {
360+ let table = TableTestFixture :: new ( ) . table ;
361+ let record_batch = table. metadata_table ( ) . history ( ) . scan ( ) . unwrap ( ) ;
362+ check_record_batch (
363+ record_batch,
364+ expect ! [ [ r#"
365+ Field { name: "made_current_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
366+ Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
367+ Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
368+ Field { name: "is_current_ancestor", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"# ] ] ,
369+ expect ! [ [ r#"
370+ made_current_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
371+ [
372+ 2018-01-04T21:22:35.770+00:00,
373+ 2019-04-12T20:29:15.770+00:00,
374+ ],
375+ snapshot_id: PrimitiveArray<Int64>
376+ [
377+ 3051729675574597004,
378+ 3055729675574597004,
379+ ],
380+ parent_id: PrimitiveArray<Int64>
381+ [
382+ null,
383+ 3051729675574597004,
384+ ],
385+ is_current_ancestor: BooleanArray
386+ [
387+ true,
388+ true,
389+ ]"# ] ] ,
390+ & [ ] ,
391+ Some ( "made_current_at" ) ,
392+ ) ;
393+ }
256394}
0 commit comments