Skip to content

Commit 395ff1f

Browse files
committed
Support metadata_log_entries metadata table
1 parent 328e18e commit 395ff1f

File tree

3 files changed

+160
-13
lines changed

3 files changed

+160
-13
lines changed

crates/iceberg/src/metadata_scan.rs

Lines changed: 151 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
use std::sync::Arc;
2121

2222
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
23-
use arrow_array::types::{Int64Type, TimestampMillisecondType};
23+
use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType};
2424
use arrow_array::RecordBatch;
2525
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2626

27-
use crate::spec::TableMetadata;
27+
use crate::spec::{SnapshotRef, TableMetadata};
2828
use crate::table::Table;
2929
use crate::Result;
3030

@@ -35,11 +35,11 @@ use crate::Result;
3535
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
3636
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
3737
#[derive(Debug)]
38-
pub struct MetadataTable(Table);
38+
pub struct MetadataTable<'a>(&'a Table);
3939

40-
impl MetadataTable {
40+
impl<'a> MetadataTable<'a> {
4141
/// Creates a new metadata scan.
42-
pub(super) fn new(table: Table) -> Self {
42+
pub(super) fn new(table: &'a Table) -> Self {
4343
Self(table)
4444
}
4545

@@ -50,14 +50,25 @@ impl MetadataTable {
5050
}
5151
}
5252

53+
/// Return the metadata log entries of the table.
54+
pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable {
55+
MetadataLogEntriesTable {
56+
metadata_table: self,
57+
}
58+
}
59+
5360
fn metadata(&self) -> &TableMetadata {
5461
self.0.metadata()
5562
}
63+
64+
fn metadata_location(&self) -> Option<&str> {
65+
self.0.metadata_location()
66+
}
5667
}
5768

5869
/// Snapshots table.
5970
pub struct SnapshotsTable<'a> {
60-
metadata_table: &'a MetadataTable,
71+
metadata_table: &'a MetadataTable<'a>,
6172
}
6273

6374
impl<'a> SnapshotsTable<'a> {
@@ -128,6 +139,89 @@ impl<'a> SnapshotsTable<'a> {
128139
}
129140
}
130141

142+
/// Metadata log entries table.
143+
///
144+
/// Use to inspect the current and historical metadata files in the table.
145+
/// Contains every metadata file and the time it was added. For each metadata
146+
/// file, the table contains information about the latest snapshot at the time.
147+
pub struct MetadataLogEntriesTable<'a> {
148+
metadata_table: &'a MetadataTable<'a>,
149+
}
150+
151+
impl<'a> MetadataLogEntriesTable<'a> {
152+
/// Return the schema of the metadata log entries table.
153+
pub fn schema(&self) -> Schema {
154+
Schema::new(vec![
155+
Field::new(
156+
"timestamp",
157+
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
158+
false,
159+
),
160+
Field::new("file", DataType::Utf8, false),
161+
Field::new("latest_snapshot_id", DataType::Int64, true),
162+
Field::new("latest_schema_id", DataType::Int32, true),
163+
Field::new("latest_sequence_number", DataType::Int64, true),
164+
])
165+
}
166+
167+
/// Scan the metadata log entries table.
168+
pub fn scan(&self) -> Result<RecordBatch> {
169+
let mut timestamp =
170+
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
171+
let mut file = StringBuilder::new();
172+
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
173+
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
174+
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();
175+
176+
let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| {
177+
timestamp.append_value(timestamp_ms);
178+
file.append_value(metadata_file);
179+
180+
let snapshot = self.snapshot_id_as_of_time(timestamp_ms);
181+
latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id()));
182+
latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id()));
183+
latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number()));
184+
};
185+
186+
for metadata_log_entry in self.metadata_table.metadata().metadata_log() {
187+
append_metadata_log_entry(
188+
metadata_log_entry.timestamp_ms,
189+
&metadata_log_entry.metadata_file,
190+
);
191+
}
192+
193+
// Include the current metadata location and modification time in the table. This matches
194+
// the Java implementation. Unlike the Java implementation, a current metadata location is
195+
// optional here. In that case, we omit current metadata from the metadata log table.
196+
if let Some(current_metadata_location) = &self.metadata_table.metadata_location() {
197+
append_metadata_log_entry(
198+
self.metadata_table.metadata().last_updated_ms(),
199+
current_metadata_location,
200+
);
201+
}
202+
203+
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
204+
Arc::new(timestamp.finish()),
205+
Arc::new(file.finish()),
206+
Arc::new(latest_snapshot_id.finish()),
207+
Arc::new(latest_schema_id.finish()),
208+
Arc::new(latest_sequence_number.finish()),
209+
])?)
210+
}
211+
212+
fn snapshot_id_as_of_time(&self, timestamp_ms_inclusive: i64) -> Option<&SnapshotRef> {
213+
let table_metadata = self.metadata_table.metadata();
214+
let mut snapshot_id = None;
215+
// The table metadata snapshot log is chronological
216+
for log_entry in table_metadata.history() {
217+
if log_entry.timestamp_ms <= timestamp_ms_inclusive {
218+
snapshot_id = Some(log_entry.snapshot_id);
219+
}
220+
}
221+
snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id))
222+
}
223+
}
224+
131225
#[cfg(test)]
132226
mod tests {
133227
use expect_test::{expect, Expect};
@@ -253,4 +347,55 @@ mod tests {
253347
Some("committed_at"),
254348
);
255349
}
350+
351+
#[test]
352+
fn test_metadata_log_entries_table() {
353+
let table = TableTestFixture::new().table;
354+
let record_batch = table.metadata_table().metadata_log_entries().scan().unwrap();
355+
356+
// Check the current metadata location is included
357+
let current_metadata_location = table.metadata_location().unwrap();
358+
assert!(record_batch
359+
.column_by_name("file")
360+
.unwrap()
361+
.as_any()
362+
.downcast_ref::<arrow_array::StringArray>()
363+
.unwrap()
364+
.iter()
365+
.any(|location| location.is_some_and(|l| l.eq(current_metadata_location))));
366+
367+
check_record_batch(
368+
record_batch,
369+
expect![[r#"
370+
Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
371+
Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
372+
Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
373+
Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
374+
Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
375+
expect![[r#"
376+
timestamp: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
377+
[
378+
1970-01-01T00:25:15.100+00:00,
379+
2020-10-14T01:22:53.590+00:00,
380+
],
381+
file: (skipped),
382+
latest_snapshot_id: PrimitiveArray<Int64>
383+
[
384+
null,
385+
3055729675574597004,
386+
],
387+
latest_schema_id: PrimitiveArray<Int32>
388+
[
389+
null,
390+
1,
391+
],
392+
latest_sequence_number: PrimitiveArray<Int64>
393+
[
394+
null,
395+
1,
396+
]"#]],
397+
&["file"],
398+
Some("timestamp"),
399+
);
400+
}
256401
}

crates/iceberg/src/scan.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,19 +1002,21 @@ pub mod tests {
10021002
let table_location = tmp_dir.path().join("table1");
10031003
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
10041004
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
1005+
// This is a past metadata location in the metadata log
10051006
let table_metadata1_location = table_location.join("metadata/v1.json");
1007+
// This is the actual location of current metadata
1008+
let template_json_location = format!(
1009+
"{}/testdata/example_table_metadata_v2.json",
1010+
env!("CARGO_MANIFEST_DIR")
1011+
);
10061012

10071013
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
10081014
.unwrap()
10091015
.build()
10101016
.unwrap();
10111017

10121018
let table_metadata = {
1013-
let template_json_str = fs::read_to_string(format!(
1014-
"{}/testdata/example_table_metadata_v2.json",
1015-
env!("CARGO_MANIFEST_DIR")
1016-
))
1017-
.unwrap();
1019+
let template_json_str = fs::read_to_string(&template_json_location).unwrap();
10181020
let mut context = Context::new();
10191021
context.insert("table_location", &table_location);
10201022
context.insert("manifest_list_1_location", &manifest_list1_location);
@@ -1029,7 +1031,7 @@ pub mod tests {
10291031
.metadata(table_metadata)
10301032
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
10311033
.file_io(file_io.clone())
1032-
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
1034+
.metadata_location(template_json_location)
10331035
.build()
10341036
.unwrap();
10351037

crates/iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Table {
203203

204204
/// Creates a metadata table which provides table-like APIs for inspecting metadata.
205205
/// See [`MetadataTable`] for more details.
206-
pub fn metadata_table(self) -> MetadataTable {
206+
pub fn metadata_table(&self) -> MetadataTable<'_> {
207207
MetadataTable::new(self)
208208
}
209209

0 commit comments

Comments
 (0)