Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod remove_snapshots;
mod rewrite_files;
mod snapshot;
mod sort_order;
mod utils;

use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down
73 changes: 62 additions & 11 deletions crates/iceberg/src/transaction/remove_snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@
//! Transaction action for removing snapshot.

use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use itertools::Itertools;

use super::utils::ReachableFileCleanupStrategy;
use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{
SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT,
MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP,
MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
SnapshotReference, SnapshotRetention, TableMetadataRef, MAIN_BRANCH, MAX_REF_AGE_MS,
MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT,
MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
};
use crate::table::Table;
use crate::transaction::Transaction;
use crate::utils::ancestors_of;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
use crate::{Catalog, Error, ErrorKind, TableRequirement, TableUpdate};

/// RemoveSnapshotAction is a transaction action for removing snapshot.
pub struct RemoveSnapshotAction<'a> {
tx: Transaction<'a>,
clear_expire_files: bool,
clear_expired_files: bool,
ids_to_remove: HashSet<i64>,
default_expired_older_than: i64,
default_min_num_snapshots: i32,
Expand Down Expand Up @@ -69,7 +73,7 @@ impl<'a> RemoveSnapshotAction<'a> {

Self {
tx,
clear_expire_files: false,
clear_expired_files: false,
ids_to_remove: HashSet::new(),
default_expired_older_than: now - default_max_snapshot_age_ms,
default_min_num_snapshots,
Expand All @@ -80,8 +84,8 @@ impl<'a> RemoveSnapshotAction<'a> {
}

/// Finished building the action and apply it to the transaction.
pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self {
self.clear_expire_files = clear_expire_files;
pub fn clear_expired_files(mut self, clear_expired_files: bool) -> Self {
self.clear_expired_files = clear_expired_files;
self
}

Expand Down Expand Up @@ -110,9 +114,12 @@ impl<'a> RemoveSnapshotAction<'a> {
}

/// Finished building the action and apply it to the transaction.
pub async fn apply(mut self) -> Result<Transaction<'a>> {
pub async fn apply(mut self) -> Result<RemoveSnapshotApplyResult<'a>> {
if self.tx.current_table.metadata().refs.is_empty() {
return Ok(self.tx);
return Ok(RemoveSnapshotApplyResult {
tx: self.tx,
clear_expired_files: self.clear_expired_files,
});
}

let table_meta = self.tx.current_table.metadata().clone();
Expand Down Expand Up @@ -255,7 +262,10 @@ impl<'a> RemoveSnapshotAction<'a> {
},
])?;

Ok(self.tx)
Ok(RemoveSnapshotApplyResult {
tx: self.tx,
clear_expired_files: self.clear_expired_files,
})
}

fn compute_retained_refs(
Expand Down Expand Up @@ -403,6 +413,47 @@ impl<'a> RemoveSnapshotAction<'a> {
}
}

pub struct RemoveSnapshotApplyResult<'a> {
tx: Transaction<'a>,
clear_expired_files: bool,
}

impl RemoveSnapshotApplyResult<'_> {
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let after_expiration = self.tx.current_table.metadata_ref();
let before_expiration = self.tx.base_table.metadata_ref();
let file_io = self.tx.current_table.file_io().clone();

let table = self.tx.commit(catalog).await?;

if self.clear_expired_files {
Self::clean_expired_files(file_io, &before_expiration, &after_expiration).await?;
}

Ok(table)
}

async fn clean_expired_files(
file_io: FileIO,
before_expiration: &TableMetadataRef,
after_expiration: &TableMetadataRef,
) -> Result<()> {
let file_cleanup_strategy = ReachableFileCleanupStrategy::new(file_io);

file_cleanup_strategy
.clean_files(before_expiration, after_expiration)
.await
}
}

impl<'a> Deref for RemoveSnapshotApplyResult<'a> {
type Target = Transaction<'a>;

fn deref(&self) -> &Self::Target {
&self.tx
}
}

#[cfg(test)]
mod tests {
use std::fs::File;
Expand Down
5 changes: 4 additions & 1 deletion crates/iceberg/src/transaction/rewrite_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,16 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
})
.collect();

if found_deleted_files.is_empty() {
if found_deleted_files.is_empty()
&& (manifest_file.has_added_files() || manifest_file.has_existing_files())
{
existing_files.push(manifest_file.clone());
} else {
// Rewrite the manifest file without the deleted data files
if manifest
.entries()
.iter()
.filter(|entry| entry.status() != ManifestStatus::Deleted)
.any(|entry| !found_deleted_files.contains(entry.data_file().file_path()))
{
let mut manifest_writer = snapshot_produce.new_manifest_writer(
Expand Down
162 changes: 162 additions & 0 deletions crates/iceberg/src/transaction/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::sync::Arc;

use futures::stream::{self, StreamExt};
use futures::TryStreamExt;

use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{ManifestFile, Snapshot, TableMetadataRef};

const DEFAULT_DELETE_CONCURRENCY_LIMIT: usize = 10;

pub struct ReachableFileCleanupStrategy {
file_io: FileIO,
}

impl ReachableFileCleanupStrategy {
pub fn new(file_io: FileIO) -> Self {
Self { file_io }
}
}

impl ReachableFileCleanupStrategy {
pub async fn clean_files(
&self,
before_expiration: &TableMetadataRef,
after_expiration: &TableMetadataRef,
) -> Result<()> {
let mut manifest_lists_to_delete: HashSet<&str> = HashSet::default();
let mut expired_snapshots = Vec::default();
for snapshot in before_expiration.snapshots() {
if after_expiration
.snapshot_by_id(snapshot.snapshot_id())
.is_none()
{
expired_snapshots.push(snapshot);
manifest_lists_to_delete.insert(snapshot.manifest_list());
}
}

let deletion_candidates = {
let mut deletion_candidates = HashSet::default();
// This part can also be parallelized if `load_manifest_list` is a bottleneck
// and if the underlying FileIO supports concurrent reads efficiently.
for snapshot in expired_snapshots {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, before_expiration)
.await?;

for manifest_file in manifest_list.entries() {
deletion_candidates.insert(manifest_file.clone());
}
}
deletion_candidates
};

if !deletion_candidates.is_empty() {
let (manifests_to_delete, referenced_manifests) = self
.prune_referenced_manifests(
after_expiration.snapshots(),
after_expiration,
deletion_candidates,
)
.await?;

if !manifests_to_delete.is_empty() {
let files_to_delete = self
.find_files_to_delete(&manifests_to_delete, &referenced_manifests)
.await?;

stream::iter(files_to_delete)
.map(|file_path| self.file_io.delete(file_path))
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
.try_collect::<Vec<_>>()
.await?;

stream::iter(manifests_to_delete)
.map(|manifest_file| self.file_io.delete(manifest_file.manifest_path))
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
.try_collect::<Vec<_>>()
.await?;
}
}

stream::iter(manifest_lists_to_delete)
.map(|path| self.file_io.delete(path))
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}

async fn prune_referenced_manifests(
&self,
snapshots: impl Iterator<Item = &Arc<Snapshot>>,
table_meta_data_ref: &TableMetadataRef,
mut deletion_candidates: HashSet<ManifestFile>,
) -> Result<(HashSet<ManifestFile>, HashSet<ManifestFile>)> {
let mut referenced_manifests = HashSet::default();
for snapshot in snapshots {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_meta_data_ref)
.await?;

for manifest_file in manifest_list.entries() {
deletion_candidates.remove(manifest_file);
referenced_manifests.insert(manifest_file.clone());

if deletion_candidates.is_empty() {
break;
}
}
}

Ok((deletion_candidates, referenced_manifests))
}

async fn find_files_to_delete(
&self,
manifest_files: &HashSet<ManifestFile>,
referenced_manifests: &HashSet<ManifestFile>,
) -> Result<HashSet<String>> {
let mut files_to_delete = HashSet::default();
for manifest_file in manifest_files {
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
for entry in m.entries() {
files_to_delete.insert(entry.data_file().file_path().to_owned());
}
}

if files_to_delete.is_empty() {
return Ok(files_to_delete);
}

for manifest_file in referenced_manifests {
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
for entry in m.entries() {
files_to_delete.remove(entry.data_file().file_path());
}
}

Ok(files_to_delete)
}
}
Loading
Loading