|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
18 | | -//! Metadata table api. |
19 | | -
|
20 | 18 | use std::sync::Arc; |
21 | 19 |
|
22 | 20 | use arrow_array::builder::{ |
23 | | - BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, |
| 21 | + BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, |
24 | 22 | }; |
25 | | -use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; |
| 23 | +use arrow_array::types::{Int32Type, Int64Type, Int8Type}; |
26 | 24 | use arrow_array::RecordBatch; |
27 | | -use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; |
| 25 | +use arrow_schema::{DataType, Field, Fields, Schema}; |
28 | 26 |
|
29 | 27 | use crate::table::Table; |
30 | | -use crate::Result; |
31 | | - |
32 | | -/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. |
33 | | -/// |
34 | | -/// References: |
35 | | -/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39> |
36 | | -/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql> |
37 | | -/// - <https://py.iceberg.apache.org/api/#inspecting-tables> |
38 | | -#[derive(Debug)] |
39 | | -pub struct MetadataTable(Table); |
40 | | - |
41 | | -impl MetadataTable { |
42 | | - /// Creates a new metadata scan. |
43 | | - pub(super) fn new(table: Table) -> Self { |
44 | | - Self(table) |
45 | | - } |
46 | | - |
47 | | - /// Get the snapshots table. |
48 | | - pub fn snapshots(&self) -> SnapshotsTable { |
49 | | - SnapshotsTable { table: &self.0 } |
50 | | - } |
51 | | - |
52 | | - /// Get the manifests table. |
53 | | - pub fn manifests(&self) -> ManifestsTable { |
54 | | - ManifestsTable { table: &self.0 } |
55 | | - } |
56 | | -} |
57 | | - |
58 | | -/// Snapshots table. |
59 | | -pub struct SnapshotsTable<'a> { |
60 | | - table: &'a Table, |
61 | | -} |
62 | | - |
63 | | -impl<'a> SnapshotsTable<'a> { |
64 | | - /// Returns the schema of the snapshots table. |
65 | | - pub fn schema(&self) -> Schema { |
66 | | - Schema::new(vec![ |
67 | | - Field::new( |
68 | | - "committed_at", |
69 | | - DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), |
70 | | - false, |
71 | | - ), |
72 | | - Field::new("snapshot_id", DataType::Int64, false), |
73 | | - Field::new("parent_id", DataType::Int64, true), |
74 | | - Field::new("operation", DataType::Utf8, false), |
75 | | - Field::new("manifest_list", DataType::Utf8, false), |
76 | | - Field::new( |
77 | | - "summary", |
78 | | - DataType::Map( |
79 | | - Arc::new(Field::new( |
80 | | - "entries", |
81 | | - DataType::Struct( |
82 | | - vec![ |
83 | | - Field::new("keys", DataType::Utf8, false), |
84 | | - Field::new("values", DataType::Utf8, true), |
85 | | - ] |
86 | | - .into(), |
87 | | - ), |
88 | | - false, |
89 | | - )), |
90 | | - false, |
91 | | - ), |
92 | | - false, |
93 | | - ), |
94 | | - ]) |
95 | | - } |
96 | | - |
97 | | - /// Scans the snapshots table. |
98 | | - pub fn scan(&self) -> Result<RecordBatch> { |
99 | | - let mut committed_at = |
100 | | - PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00"); |
101 | | - let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new(); |
102 | | - let mut parent_id = PrimitiveBuilder::<Int64Type>::new(); |
103 | | - let mut operation = StringBuilder::new(); |
104 | | - let mut manifest_list = StringBuilder::new(); |
105 | | - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); |
106 | | - |
107 | | - for snapshot in self.table.metadata().snapshots() { |
108 | | - committed_at.append_value(snapshot.timestamp_ms()); |
109 | | - snapshot_id.append_value(snapshot.snapshot_id()); |
110 | | - parent_id.append_option(snapshot.parent_snapshot_id()); |
111 | | - manifest_list.append_value(snapshot.manifest_list()); |
112 | | - operation.append_value(snapshot.summary().operation.as_str()); |
113 | | - for (key, value) in &snapshot.summary().additional_properties { |
114 | | - summary.keys().append_value(key); |
115 | | - summary.values().append_value(value); |
116 | | - } |
117 | | - summary.append(true)?; |
118 | | - } |
119 | | - |
120 | | - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ |
121 | | - Arc::new(committed_at.finish()), |
122 | | - Arc::new(snapshot_id.finish()), |
123 | | - Arc::new(parent_id.finish()), |
124 | | - Arc::new(operation.finish()), |
125 | | - Arc::new(manifest_list.finish()), |
126 | | - Arc::new(summary.finish()), |
127 | | - ])?) |
128 | | - } |
129 | | -} |
130 | 28 |
|
131 | 29 | /// Manifests table. |
132 | 30 | pub struct ManifestsTable<'a> { |
133 | 31 | table: &'a Table, |
134 | 32 | } |
135 | 33 |
|
136 | 34 | impl<'a> ManifestsTable<'a> { |
| 35 | + /// Create a new Manifests table instance. |
| 36 | + pub fn new(table: &'a Table) -> Self { |
| 37 | + Self { table } |
| 38 | + } |
| 39 | + |
137 | 40 | fn partition_summary_fields(&self) -> Vec<Field> { |
138 | 41 | vec![ |
139 | 42 | Field::new("contains_null", DataType::Boolean, false), |
@@ -170,7 +73,7 @@ impl<'a> ManifestsTable<'a> { |
170 | 73 | } |
171 | 74 |
|
172 | 75 | /// Scans the manifests table. |
173 | | - pub async fn scan(&self) -> Result<RecordBatch> { |
| 76 | + pub async fn scan(&self) -> crate::Result<RecordBatch> { |
174 | 77 | let mut content = PrimitiveBuilder::<Int8Type>::new(); |
175 | 78 | let mut path = StringBuilder::new(); |
176 | 79 | let mut length = PrimitiveBuilder::<Int64Type>::new(); |
@@ -257,130 +160,11 @@ impl<'a> ManifestsTable<'a> { |
257 | 160 |
|
258 | 161 | #[cfg(test)] |
259 | 162 | mod tests { |
260 | | - use expect_test::{expect, Expect}; |
261 | | - use itertools::Itertools; |
| 163 | + use expect_test::expect; |
262 | 164 |
|
263 | | - use super::*; |
| 165 | + use crate::inspect::metadata_table::tests::check_record_batch; |
264 | 166 | use crate::scan::tests::TableTestFixture; |
265 | 167 |
|
266 | | - /// Snapshot testing to check the resulting record batch. |
267 | | - /// |
268 | | - /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, |
269 | | - /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, |
270 | | - /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). |
271 | | - /// Check the doc of [`expect_test`] for more details. |
272 | | - /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. |
273 | | - /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. |
274 | | - fn check_record_batch( |
275 | | - record_batch: RecordBatch, |
276 | | - expected_schema: Expect, |
277 | | - expected_data: Expect, |
278 | | - ignore_check_columns: &[&str], |
279 | | - sort_column: Option<&str>, |
280 | | - ) { |
281 | | - let mut columns = record_batch.columns().to_vec(); |
282 | | - if let Some(sort_column) = sort_column { |
283 | | - let column = record_batch.column_by_name(sort_column).unwrap(); |
284 | | - let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); |
285 | | - columns = columns |
286 | | - .iter() |
287 | | - .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) |
288 | | - .collect_vec(); |
289 | | - } |
290 | | - |
291 | | - expected_schema.assert_eq(&format!( |
292 | | - "{}", |
293 | | - record_batch.schema().fields().iter().format(",\n") |
294 | | - )); |
295 | | - expected_data.assert_eq(&format!( |
296 | | - "{}", |
297 | | - record_batch |
298 | | - .schema() |
299 | | - .fields() |
300 | | - .iter() |
301 | | - .zip_eq(columns) |
302 | | - .map(|(field, column)| { |
303 | | - if ignore_check_columns.contains(&field.name().as_str()) { |
304 | | - format!("{}: (skipped)", field.name()) |
305 | | - } else { |
306 | | - format!("{}: {:?}", field.name(), column) |
307 | | - } |
308 | | - }) |
309 | | - .format(",\n") |
310 | | - )); |
311 | | - } |
312 | | - |
313 | | - #[test] |
314 | | - fn test_snapshots_table() { |
315 | | - let table = TableTestFixture::new().table; |
316 | | - let record_batch = table.metadata_table().snapshots().scan().unwrap(); |
317 | | - check_record_batch( |
318 | | - record_batch, |
319 | | - expect![[r#" |
320 | | - Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, |
321 | | - Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, |
322 | | - Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, |
323 | | - Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, |
324 | | - Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, |
325 | | - Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], |
326 | | - expect![[r#" |
327 | | - committed_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))> |
328 | | - [ |
329 | | - 2018-01-04T21:22:35.770+00:00, |
330 | | - 2019-04-12T20:29:15.770+00:00, |
331 | | - ], |
332 | | - snapshot_id: PrimitiveArray<Int64> |
333 | | - [ |
334 | | - 3051729675574597004, |
335 | | - 3055729675574597004, |
336 | | - ], |
337 | | - parent_id: PrimitiveArray<Int64> |
338 | | - [ |
339 | | - null, |
340 | | - 3051729675574597004, |
341 | | - ], |
342 | | - operation: StringArray |
343 | | - [ |
344 | | - "append", |
345 | | - "append", |
346 | | - ], |
347 | | - manifest_list: (skipped), |
348 | | - summary: MapArray |
349 | | - [ |
350 | | - StructArray |
351 | | - -- validity: |
352 | | - [ |
353 | | - ] |
354 | | - [ |
355 | | - -- child 0: "keys" (Utf8) |
356 | | - StringArray |
357 | | - [ |
358 | | - ] |
359 | | - -- child 1: "values" (Utf8) |
360 | | - StringArray |
361 | | - [ |
362 | | - ] |
363 | | - ], |
364 | | - StructArray |
365 | | - -- validity: |
366 | | - [ |
367 | | - ] |
368 | | - [ |
369 | | - -- child 0: "keys" (Utf8) |
370 | | - StringArray |
371 | | - [ |
372 | | - ] |
373 | | - -- child 1: "values" (Utf8) |
374 | | - StringArray |
375 | | - [ |
376 | | - ] |
377 | | - ], |
378 | | - ]"#]], |
379 | | - &["manifest_list"], |
380 | | - Some("committed_at"), |
381 | | - ); |
382 | | - } |
383 | | - |
384 | 168 | #[tokio::test] |
385 | 169 | async fn test_manifests_table() { |
386 | 170 | let mut fixture = TableTestFixture::new(); |
|
0 commit comments