Skip to content

Commit ccb48c3

Browse files
committed
fix(iceberg): Introduce new data sequence for RewriteFilesAction (#51)
* feat(iceberg): rewrite_files support use_starting_sequence_number * chore(test): add test_sequence_number_in_manifest_entry
1 parent fd23d8e commit ccb48c3

File tree

8 files changed

+199
-118
lines changed

8 files changed

+199
-118
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::spec::{
3333
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
3434
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3535
};
36+
use crate::utils::ancestors_between;
3637
use crate::{Error, ErrorKind, Result};
3738

3839
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
@@ -348,57 +349,3 @@ impl PlanContext {
348349
}
349350
}
350351
}
351-
352-
struct Ancestors {
353-
next: Option<SnapshotRef>,
354-
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
355-
}
356-
357-
impl Iterator for Ancestors {
358-
type Item = SnapshotRef;
359-
360-
fn next(&mut self) -> Option<Self::Item> {
361-
let snapshot = self.next.take()?;
362-
let result = snapshot.clone();
363-
self.next = snapshot
364-
.parent_snapshot_id()
365-
.and_then(|id| (self.get_snapshot)(id));
366-
Some(result)
367-
}
368-
}
369-
370-
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
371-
fn ancestors_of(
372-
table_metadata: &TableMetadataRef,
373-
snapshot: i64,
374-
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
375-
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
376-
let table_metadata = table_metadata.clone();
377-
Box::new(Ancestors {
378-
next: Some(snapshot.clone()),
379-
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
380-
})
381-
} else {
382-
Box::new(std::iter::empty())
383-
}
384-
}
385-
386-
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
387-
fn ancestors_between(
388-
table_metadata: &TableMetadataRef,
389-
latest_snapshot_id: i64,
390-
oldest_snapshot_id: Option<i64>,
391-
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
392-
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
393-
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
394-
};
395-
396-
if latest_snapshot_id == oldest_snapshot_id {
397-
return Box::new(std::iter::empty());
398-
}
399-
400-
Box::new(
401-
ancestors_of(table_metadata, latest_snapshot_id)
402-
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
403-
)
404-
}

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl Snapshot {
193193
}
194194

195195
/// Get parent snapshot.
196+
#[cfg(test)]
196197
pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
197198
match self.parent_snapshot_id {
198199
Some(id) => table_metadata.snapshot_by_id(id).cloned(),
@@ -510,33 +511,6 @@ impl SnapshotRetention {
510511
}
511512
}
512513

513-
/// An iterator over the ancestors of a snapshot.
514-
pub struct AncestorIterator<'a> {
515-
current: Option<SnapshotRef>,
516-
table_metadata: &'a TableMetadata,
517-
}
518-
519-
impl Iterator for AncestorIterator<'_> {
520-
type Item = SnapshotRef;
521-
522-
fn next(&mut self) -> Option<Self::Item> {
523-
let current = self.current.take()?;
524-
525-
let next = current.parent_snapshot(self.table_metadata);
526-
self.current = next;
527-
528-
Some(current)
529-
}
530-
}
531-
532-
/// Returns an iterator over the ancestors of a snapshot.
533-
pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> {
534-
AncestorIterator {
535-
current: Some(snapshot),
536-
table_metadata,
537-
}
538-
}
539-
540514
#[cfg(test)]
541515
mod tests {
542516
use std::collections::HashMap;

crates/iceberg/src/transaction/remove_snapshots.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ use async_trait::async_trait;
2424
use itertools::Itertools;
2525

2626
use crate::error::Result;
27-
use crate::spec::{MAIN_BRANCH, SnapshotReference, SnapshotRetention, TableMetadata, ancestors_of};
27+
use crate::spec::{MAIN_BRANCH, SnapshotReference, SnapshotRetention, TableMetadataRef};
2828
use crate::table::Table;
2929
use crate::transaction::{ActionCommit, TransactionAction};
30+
use crate::utils::ancestors_of;
3031
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
3132

3233
/// Default value for max snapshot age in milliseconds.
@@ -103,7 +104,7 @@ impl RemoveSnapshotAction {
103104
fn compute_retained_refs(
104105
&self,
105106
snapshot_refs: &HashMap<String, SnapshotReference>,
106-
table_meta: &TableMetadata,
107+
table_meta: &TableMetadataRef,
107108
) -> HashMap<String, SnapshotReference> {
108109
let mut retained_refs = HashMap::new();
109110

@@ -144,7 +145,7 @@ impl RemoveSnapshotAction {
144145
fn compute_all_branch_snapshots_to_retain(
145146
&self,
146147
refs: impl Iterator<Item = SnapshotReference>,
147-
table_meta: &TableMetadata,
148+
table_meta: &TableMetadataRef,
148149
) -> HashSet<i64> {
149150
let mut branch_snapshots_to_retain = HashSet::new();
150151
for snapshot_ref in refs {
@@ -192,11 +193,11 @@ impl RemoveSnapshotAction {
192193
snapshot_id: i64,
193194
expire_snapshots_older_than: i64,
194195
min_snapshots_to_keep: usize,
195-
table_meta: &TableMetadata,
196+
table_meta: &TableMetadataRef,
196197
) -> HashSet<i64> {
197198
let mut ids_to_retain = HashSet::new();
198199
if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) {
199-
let ancestors = ancestors_of(snapshot.clone(), table_meta);
200+
let ancestors = ancestors_of(table_meta, snapshot.snapshot_id());
200201
for ancestor in ancestors {
201202
if ids_to_retain.len() < min_snapshots_to_keep
202203
|| ancestor.timestamp_ms() >= expire_snapshots_older_than
@@ -214,15 +215,15 @@ impl RemoveSnapshotAction {
214215
fn unreferenced_snapshots_to_retain(
215216
&self,
216217
refs: impl Iterator<Item = SnapshotReference>,
217-
table_meta: &TableMetadata,
218+
table_meta: &TableMetadataRef,
218219
) -> HashSet<i64> {
219220
let mut ids_to_retain = HashSet::new();
220221
let mut referenced_snapshots = HashSet::new();
221222

222223
for snapshot_ref in refs {
223224
if snapshot_ref.is_branch() {
224225
if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_ref.snapshot_id) {
225-
let ancestors = ancestors_of(snapshot.clone(), table_meta);
226+
let ancestors = ancestors_of(table_meta, snapshot.snapshot_id());
226227
for ancestor in ancestors {
227228
referenced_snapshots.insert(ancestor.snapshot_id());
228229
}
@@ -251,7 +252,7 @@ impl TransactionAction for RemoveSnapshotAction {
251252
return Ok(ActionCommit::new(vec![], vec![]));
252253
}
253254

254-
let table_meta = table.metadata().clone();
255+
let table_meta = table.metadata_ref();
255256

256257
let mut ids_to_retain = HashSet::new();
257258
let retained_refs = self.compute_retained_refs(&table_meta.refs, &table_meta);

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ pub struct RewriteFilesAction {
5353
removed_data_files: Vec<DataFile>,
5454
removed_delete_files: Vec<DataFile>,
5555
snapshot_id: Option<i64>,
56+
57+
new_data_file_sequence_number: Option<i64>,
5658
}
5759

5860
struct RewriteFilesOperation;
@@ -72,6 +74,7 @@ impl RewriteFilesAction {
7274
removed_data_files: Vec::new(),
7375
removed_delete_files: Vec::new(),
7476
snapshot_id: None,
77+
new_data_file_sequence_number: None,
7578
}
7679
}
7780

@@ -121,6 +124,7 @@ impl RewriteFilesAction {
121124
self.min_count_to_merge = min_count_to_merge;
122125
self.merge_enabled = merge_enabled;
123126
self.snapshot_properties = properties;
127+
124128
self
125129
}
126130

@@ -141,6 +145,12 @@ impl RewriteFilesAction {
141145
self.snapshot_id = Some(snapshot_id);
142146
self
143147
}
148+
149+
pub fn set_new_data_file_sequence_number(mut self, seq: i64) -> Result<Self> {
150+
self.new_data_file_sequence_number = Some(seq);
151+
152+
Ok(self)
153+
}
144154
}
145155

146156
impl SnapshotProduceOperation for RewriteFilesOperation {
@@ -277,7 +287,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
277287
#[async_trait::async_trait]
278288
impl TransactionAction for RewriteFilesAction {
279289
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
280-
let snapshot_producer = SnapshotProducer::new(
290+
let mut snapshot_producer = SnapshotProducer::new(
281291
table,
282292
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
283293
self.key_metadata.clone(),
@@ -300,6 +310,10 @@ impl TransactionAction for RewriteFilesAction {
300310
.await?;
301311
}
302312

313+
if let Some(seq) = self.new_data_file_sequence_number {
314+
snapshot_producer.set_new_data_file_sequence_number(seq);
315+
}
316+
303317
if self.merge_enabled {
304318
let process =
305319
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub(crate) struct SnapshotProducer<'a> {
9191
// It starts from 0 and increments for each new manifest file.
9292
// Note: This counter is limited to the range of (0..u64::MAX).
9393
manifest_counter: RangeFrom<u64>,
94+
95+
new_data_file_sequence_number: Option<i64>,
9496
}
9597

9698
impl<'a> SnapshotProducer<'a> {
@@ -127,6 +129,7 @@ impl<'a> SnapshotProducer<'a> {
127129
removed_data_file_paths,
128130
removed_delete_file_paths,
129131
manifest_counter: (0..),
132+
new_data_file_sequence_number: None,
130133
}
131134
}
132135

@@ -287,7 +290,11 @@ impl<'a> SnapshotProducer<'a> {
287290
}
288291

289292
// Write manifest file for added data files and return the ManifestFile for ManifestList.
290-
async fn write_added_manifest(&mut self, added_files: Vec<DataFile>) -> Result<ManifestFile> {
293+
async fn write_added_manifest(
294+
&mut self,
295+
added_files: Vec<DataFile>,
296+
data_seq: Option<i64>,
297+
) -> Result<ManifestFile> {
291298
if added_files.is_empty() {
292299
return Err(Error::new(
293300
ErrorKind::PreconditionFailed,
@@ -325,7 +332,9 @@ impl<'a> SnapshotProducer<'a> {
325332
let manifest_entries = added_files.into_iter().map(|data_file| {
326333
let builder = ManifestEntry::builder()
327334
.status(crate::spec::ManifestStatus::Added)
328-
.data_file(data_file);
335+
.data_file(data_file)
336+
.sequence_number_opt(data_seq);
337+
329338
if format_version == FormatVersion::V1 {
330339
builder.snapshot_id(snapshot_id).build()
331340
} else {
@@ -423,13 +432,17 @@ impl<'a> SnapshotProducer<'a> {
423432
// Process added entries.
424433
if !self.added_data_files.is_empty() {
425434
let added_data_files = std::mem::take(&mut self.added_data_files);
426-
let added_manifest = self.write_added_manifest(added_data_files).await?;
435+
let added_manifest = self
436+
.write_added_manifest(added_data_files, self.new_data_file_sequence_number)
437+
.await?;
427438
manifest_files.push(added_manifest);
428439
}
429440

430441
if !self.added_delete_files.is_empty() {
431442
let added_delete_files = std::mem::take(&mut self.added_delete_files);
432-
let added_manifest = self.write_added_manifest(added_delete_files).await?;
443+
let added_manifest = self
444+
.write_added_manifest(added_delete_files, self.new_data_file_sequence_number)
445+
.await?;
433446
manifest_files.push(added_manifest);
434447
}
435448

@@ -601,6 +614,10 @@ impl<'a> SnapshotProducer<'a> {
601614

602615
Ok(ActionCommit::new(updates, requirements))
603616
}
617+
618+
pub fn set_new_data_file_sequence_number(&mut self, new_data_file_sequence_number: i64) {
619+
self.new_data_file_sequence_number = Some(new_data_file_sequence_number);
620+
}
604621
}
605622

606623
pub(crate) struct MergeManifestProcess {

crates/iceberg/src/utils.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use std::num::NonZeroUsize;
1919

20+
use crate::spec::{SnapshotRef, TableMetadataRef};
21+
2022
// Use a default value of 1 as the safest option.
2123
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
2224
// for more details.
@@ -185,3 +187,57 @@ pub mod bin {
185187
}
186188
}
187189
}
190+
191+
pub struct Ancestors {
192+
next: Option<SnapshotRef>,
193+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
194+
}
195+
196+
impl Iterator for Ancestors {
197+
type Item = SnapshotRef;
198+
199+
fn next(&mut self) -> Option<Self::Item> {
200+
let snapshot = self.next.take()?;
201+
let result = snapshot.clone();
202+
self.next = snapshot
203+
.parent_snapshot_id()
204+
.and_then(|id| (self.get_snapshot)(id));
205+
Some(result)
206+
}
207+
}
208+
209+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
210+
pub fn ancestors_of(
211+
table_metadata: &TableMetadataRef,
212+
snapshot: i64,
213+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
214+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
215+
let table_metadata = table_metadata.clone();
216+
Box::new(Ancestors {
217+
next: Some(snapshot.clone()),
218+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
219+
})
220+
} else {
221+
Box::new(std::iter::empty())
222+
}
223+
}
224+
225+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
226+
pub fn ancestors_between(
227+
table_metadata: &TableMetadataRef,
228+
latest_snapshot_id: i64,
229+
oldest_snapshot_id: Option<i64>,
230+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
231+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
232+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
233+
};
234+
235+
if latest_snapshot_id == oldest_snapshot_id {
236+
return Box::new(std::iter::empty());
237+
}
238+
239+
Box::new(
240+
ancestors_of(table_metadata, latest_snapshot_id)
241+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
242+
)
243+
}

0 commit comments

Comments
 (0)