From 07a1a3b76fded497d1146cb0f975e396df629ff3 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 23 Apr 2025 22:36:41 +0800 Subject: [PATCH 1/2] support rewrite manifest --- crates/iceberg/src/spec/manifest/entry.rs | 6 + crates/iceberg/src/transaction/mod.rs | 42 ++ .../src/transaction/rewrite_manifest.rs | 389 ++++++++++ crates/iceberg/src/transaction/snapshot.rs | 121 ++-- .../tests/shared_tests/mod.rs | 1 + .../shared_tests/rewrite_manifest_test.rs | 679 ++++++++++++++++++ 6 files changed, 1198 insertions(+), 40 deletions(-) create mode 100644 crates/iceberg/src/transaction/rewrite_manifest.rs create mode 100644 crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index f02b3014f4..b066d4b7be 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -136,6 +136,12 @@ impl ManifestEntry { self.sequence_number } + /// File sequence number. + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// File size in bytes. #[inline] pub fn file_size_in_bytes(&self) -> u64 { diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 6ae25775b4..d3ee13412c 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -18,14 +18,20 @@ //! This module contains transaction api. mod append; +mod rewrite_manifest; +pub use rewrite_manifest::{ + CREATED_MANIFESTS_COUNT, KEPT_MANIFESTS_COUNT, PROCESSED_ENTRY_COUNT, REPLACED_MANIFESTS_COUNT, +}; mod snapshot; mod sort_order; use std::cmp::Ordering; use std::collections::HashMap; +use std::hash::Hash; use std::mem::discriminant; use std::sync::Arc; +use rewrite_manifest::{ClusterFunc, PredicateFunc, RewriteManifsetAction}; use uuid::Uuid; use crate::error::Result; @@ -166,6 +172,42 @@ impl<'a> Transaction<'a> { ) } + /// Rewrite manifest file. + pub fn rewrite_manifest( + self, + cluster_by_func: Option>, + manifest_predicate: Option, + snapshot_id: Option, + commit_uuid: Option, + key_metadata: Vec, + ) -> Result> { + let snapshot_id = if let Some(snapshot_id) = snapshot_id { + if self + .current_table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } + snapshot_id + } else { + self.generate_unique_snapshot_id() + }; + RewriteManifsetAction::new( + self, + cluster_by_func, + manifest_predicate, + snapshot_id, + commit_uuid.unwrap_or_else(Uuid::now_v7), + key_metadata, + HashMap::new(), + ) + } + /// Creates replace sort order action. pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { ReplaceSortOrderAction { diff --git a/crates/iceberg/src/transaction/rewrite_manifest.rs b/crates/iceberg/src/transaction/rewrite_manifest.rs new file mode 100644 index 0000000000..b21c9986f6 --- /dev/null +++ b/crates/iceberg/src/transaction/rewrite_manifest.rs @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::hash::Hash; +use std::pin::Pin; + +use itertools::Itertools; +use uuid::Uuid; + +use super::snapshot::{DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation}; +use super::Transaction; +use crate::spec::{ + DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestList, ManifestWriter, + Operation, UNASSIGNED_SEQUENCE_NUMBER, UNASSIGNED_SNAPSHOT_ID, +}; +use crate::{Error, ErrorKind, Result}; + +/// New created manifest count during rewrite manifest action. +pub const CREATED_MANIFESTS_COUNT: &str = "manifests-created"; +/// Kept manifest count during rewrite manifest action. +pub const KEPT_MANIFESTS_COUNT: &str = "manifests-kept"; +/// Count of manifest been rewrite and delete during rewrite manifest action. +pub const REPLACED_MANIFESTS_COUNT: &str = "manifests-replaced"; +/// Count of manifest entry been process during rewrite manifest action. +pub const PROCESSED_ENTRY_COUNT: &str = "entries-processed"; + +pub type ClusterFunc = Box T>; +pub type PredicateFunc = Box Pin + Send>>>; + +/// Action used for rewriting manifests for a table. +pub struct RewriteManifsetAction<'a, T> { + cluster_by_func: Option>, + manifset_predicate: Option, + manifset_writers: HashMap<(T, i32), ManifestWriter>, + + // Manifest file that user added to the snapshot + added_manifests: Vec, + // Manifest file that user deleted from the snapshot + deleted_manifests: Vec, + // New manifest files that generated after rewriting + new_manifests: Vec, + // Original manifest file that don't need to rewrite + keep_manifests: Vec, + + // Used to record the manifests that need to be rewritten + rewrite_manifests: HashSet, + + snapshot_produce_action: SnapshotProduceAction<'a>, + + /// Statistics for count of process manifest entries + process_entry_count: usize, +} + +impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { + pub(crate) fn new( + tx: Transaction<'a>, + cluster_by_func: Option>, + manifest_predicate: Option, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + cluster_by_func, + manifset_predicate: manifest_predicate, + manifset_writers: HashMap::new(), + added_manifests: vec![], + deleted_manifests: vec![], + new_manifests: vec![], + keep_manifests: vec![], + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + rewrite_manifests: HashSet::new(), + process_entry_count: 0, + }) + } + + /// Add the manifset file for new snapshot + pub fn add_manifest(&mut self, manifest: ManifestFile) -> Result<()> { + if manifest.has_added_files() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add manifest file with added files to the snapshot in RewriteManifest action", + )); + } + if manifest.has_deleted_files() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add manifest file with deleted files to the snapshot in RewriteManifest action", + )); + } + if manifest.added_snapshot_id != UNASSIGNED_SNAPSHOT_ID { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add manifest file with non-empty snapshot id to the snapshot in RewriteManifest action. Snapshot id will be assigned during commit", + )); + } + if manifest.sequence_number != UNASSIGNED_SEQUENCE_NUMBER { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add manifest file with non-empty sequence number to the snapshot in RewriteManifest action. Sequence number will be assigned during commit", + )); + } + + if self.snapshot_produce_action.enable_inherit_snapshot_id { + self.added_manifests.push(manifest); + } else { + // # TODO + // For table can't inherit snapshot id, we should rewrite the whole manifest file and add it at rewritten_added_manifests. + // See: https://github.com/apache/iceberg/blob/4dbcdfc85a64dc1d97d7434e353c7f9e4c18e1b3/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java#L48 + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Rewrite manifest file is supported: Cannot add manifest file to the snapshot in RewriteManifest action when snapshot id inheritance is disabled", + )); + } + Ok(()) + } + + /// Delete the manifset file from snapshot + pub fn delete_manifset(&mut self, manifset: ManifestFile) -> Result<()> { + self.deleted_manifests.push(manifset); + Ok(()) + } + + fn require_rewrite(&self) -> bool { + self.cluster_by_func.is_some() + } + + #[inline] + async fn if_rewrite(&self, manifest: &ManifestFile) -> bool { + // Always rewrite if not predicate is provided + if let Some(predicate) = self.manifset_predicate.as_ref() { + predicate(manifest).await + } else { + true + } + } + + async fn perform_rewrite(&mut self, manifest_list: ManifestList) -> Result<()> { + let remain_manifest = manifest_list + .consume_entries() + .into_iter() + .filter(|manifest| !self.deleted_manifests.contains(manifest)); + for manifest in remain_manifest { + if manifest.content == ManifestContentType::Deletes || !self.if_rewrite(&manifest).await + { + self.keep_manifests.push(manifest.clone()); + } else { + self.rewrite_manifests.insert(manifest.clone()); + let (manifest_entries, _) = manifest + .load_manifest(self.snapshot_produce_action.tx.current_table.file_io()) + .await? + .into_parts(); + // # TODO + // If we ignore delete entry here, how to clean the file? + for entry in manifest_entries.into_iter().filter(|e| e.is_alive()) { + let key = ( + self.cluster_by_func + .as_ref() + .expect("Never enter this function if cluster_by_func is None")( + entry.data_file(), + ), + manifest.partition_spec_id, + ); + match self.manifset_writers.entry(key) { + Entry::Occupied(mut e) => { + // # TODO + // Close when file reach target size and reset the writer + e.get_mut().add_existing_entry(entry.as_ref().clone())?; + } + Entry::Vacant(e) => { + let mut writer = self.snapshot_produce_action.new_manifest_writer( + &manifest.content, + manifest.partition_spec_id, + )?; + writer.add_existing_entry(entry.as_ref().clone())?; + e.insert(writer); + } + } + self.process_entry_count += 1; + } + } + } + // write all manifest files + for (_, writer) in self.manifset_writers.drain() { + let manifest_file = writer.write_manifest_file().await?; + self.new_manifests.push(manifest_file); + } + Ok(()) + } + + fn keep_active_manifests(&mut self, manifest_list: ManifestList) -> Result<()> { + self.keep_manifests.clear(); + self.keep_manifests + .extend( + manifest_list + .consume_entries() + .into_iter() + .filter(|manifest| { + // # TODO + // Which case will reach here? + !self.rewrite_manifests.contains(manifest) + && !self.deleted_manifests.contains(manifest) + }), + ); + Ok(()) + } + + #[inline] + fn active_file_count<'t>(manifest_iter: impl Iterator) -> Result { + let mut count = 0; + for manifest in manifest_iter { + count += manifest.added_files_count.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Manifest file should have added files count", + ) + })?; + count += manifest.existing_files_count.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Manifest file should have existing files count", + ) + })?; + } + Ok(count) + } + + async fn validate_files_counts(&self) -> Result<()> { + let create_manifest = self.new_manifests.iter().chain(self.added_manifests.iter()); + let create_manifest_file_count = Self::active_file_count(create_manifest)?; + + let replaced_manifest = self + .rewrite_manifests + .iter() + .chain(self.deleted_manifests.iter()); + let replaced_manifest_file_count = Self::active_file_count(replaced_manifest)?; + + if replaced_manifest_file_count != create_manifest_file_count { + return Err(Error::new( + ErrorKind::DataInvalid, + "The number of files in the new manifest files should be equal to the number of files in the replaced manifest files", + )); + } + + Ok(()) + } + + fn summary(&self) -> Result> { + let mut summary = HashMap::new(); + summary.insert( + CREATED_MANIFESTS_COUNT.to_string(), + (self.new_manifests.len() + self.added_manifests.len()).to_string(), + ); + summary.insert( + KEPT_MANIFESTS_COUNT.to_string(), + self.keep_manifests.len().to_string(), + ); + summary.insert( + REPLACED_MANIFESTS_COUNT.to_string(), + (self.rewrite_manifests.len() + self.deleted_manifests.len()).to_string(), + ); + summary.insert( + PROCESSED_ENTRY_COUNT.to_string(), + self.process_entry_count.to_string(), + ); + // # TODO + // Sets the maximum number of changed partitions before partition summaries will be excluded. + Ok(summary) + } + + /// Apply the change to table + pub async fn apply(mut self) -> Result> { + // read all manifest files of current snapshot + let current_manifests_list = if let Some(snapshot) = self + .snapshot_produce_action + .tx + .current_table + .metadata() + .current_snapshot() + { + snapshot + .load_manifest_list( + self.snapshot_produce_action.tx.current_table.file_io(), + &self.snapshot_produce_action.tx.current_table.metadata_ref(), + ) + .await? + } else { + // Do nothing for empty snapshot + return Ok(self.snapshot_produce_action.tx); + }; + + // validate delete manifest file, make sure they are in the current manifest + if self + .deleted_manifests + .iter() + .any(|m| !current_manifests_list.entries().contains(m)) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot delete manifest file that is not in the current snapshot", + )); + } + + if self.require_rewrite() { + self.perform_rewrite(current_manifests_list).await?; + } else { + self.keep_active_manifests(current_manifests_list)?; + } + + self.validate_files_counts().await?; + + let summary = self.summary()?; + + // Rewrite the snapshot id of all added manifest + let existing_manifests = self + .new_manifests + .into_iter() + .chain(self.added_manifests.into_iter()) + .map(|mut manifest_file| { + manifest_file.added_snapshot_id = self.snapshot_produce_action.snapshot_id; + manifest_file + }) + .chain(self.keep_manifests.into_iter()) + .collect_vec(); + + self.snapshot_produce_action + .apply( + RewriteManifsetActionOperation { + existing_manifests, + summary, + }, + DefaultManifestProcess, + ) + .await + } +} + +struct RewriteManifsetActionOperation { + existing_manifests: Vec, + summary: HashMap, +} + +impl SnapshotProduceOperation for RewriteManifsetActionOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(self.existing_manifests.clone()) + } + + fn summary(&self) -> HashMap { + self.summary.clone() + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index ee9721c16c..ab0fcb6353 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -22,18 +22,20 @@ use std::ops::RangeFrom; use uuid::Uuid; use crate::error::Result; -use crate::io::OutputFile; use crate::spec::{ - update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, - MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, + update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, + StructType, Summary, MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, }; use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +/// Whether allow to inherit snapshot id. +pub const SNAPSHOT_ID_INHERITANCE_ENABLED: &str = "compatibility.snapshot-id-inheritance.enabled"; +const SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT: bool = false; pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; @@ -46,31 +48,36 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { &self, snapshot_produce: &SnapshotProduceAction, ) -> impl Future>> + Send; + fn summary(&self) -> HashMap { + HashMap::new() + } } -pub(crate) struct DefaultManifestProcess; - -impl ManifestProcess for DefaultManifestProcess { +pub(crate) trait ManifestProcess: Send + Sync { fn process_manifeset(&self, manifests: Vec) -> Vec { manifests } + fn summary(&self) -> HashMap { + HashMap::new() + } } -pub(crate) trait ManifestProcess: Send + Sync { - fn process_manifeset(&self, manifests: Vec) -> Vec; -} +pub(crate) struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess {} pub(crate) struct SnapshotProduceAction<'a> { pub tx: Transaction<'a>, - snapshot_id: i64, - key_metadata: Vec, - commit_uuid: Uuid, - snapshot_properties: HashMap, + pub snapshot_id: i64, + pub key_metadata: Vec, + pub commit_uuid: Uuid, + pub snapshot_properties: HashMap, pub added_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). - manifest_counter: RangeFrom, + pub manifest_counter: RangeFrom, + pub enable_inherit_snapshot_id: bool, } impl<'a> SnapshotProduceAction<'a> { @@ -81,6 +88,15 @@ impl<'a> SnapshotProduceAction<'a> { commit_uuid: Uuid, snapshot_properties: HashMap, ) -> Result { + let enable_inherit_snapshot_id = tx.current_table.metadata().format_version() + > FormatVersion::V1 + || tx + .current_table + .metadata() + .properties() + .get(SNAPSHOT_ID_INHERITANCE_ENABLED) + .and_then(|s| s.parse().ok()) + .unwrap_or(SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); Ok(Self { tx, snapshot_id, @@ -89,6 +105,7 @@ impl<'a> SnapshotProduceAction<'a> { added_data_files: vec![], manifest_counter: (0..), key_metadata, + enable_inherit_snapshot_id, }) } @@ -154,7 +171,11 @@ impl<'a> SnapshotProduceAction<'a> { Ok(self) } - fn new_manifest_output(&mut self) -> Result { + pub fn new_manifest_writer( + &mut self, + content_type: &ManifestContentType, + partition_spec_id: i32, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.tx.current_table.metadata().location(), @@ -163,10 +184,40 @@ impl<'a> SnapshotProduceAction<'a> { self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx + let output = self + .tx .current_table .file_io() - .new_output(new_manifest_path) + .new_output(new_manifest_path)?; + let partition_spec = self + .tx + .current_table + .metadata() + .partition_spec_by_id(partition_spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Invalid partition spec id for new manifest writer", + ) + .with_context("partition spec id", partition_spec_id.to_string()) + })? + .as_ref() + .clone(); + let builder = ManifestWriterBuilder::new( + output, + Some(self.snapshot_id), + self.key_metadata.clone(), + self.tx.current_table.metadata().current_schema().clone(), + partition_spec, + ); + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { + Ok(builder.build_v1()) + } else { + match content_type { + ManifestContentType::Data => Ok(builder.build_v2_data()), + ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), + } + } } // Write manifest file for added data files and return the ManifestFile for ManifestList. @@ -186,25 +237,10 @@ impl<'a> SnapshotProduceAction<'a> { builder.build() } }); - let mut writer = { - let builder = ManifestWriterBuilder::new( - self.new_manifest_output()?, - Some(self.snapshot_id), - self.key_metadata.clone(), - self.tx.current_table.metadata().current_schema().clone(), - self.tx - .current_table - .metadata() - .default_partition_spec() - .as_ref() - .clone(), - ); - if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { - builder.build_v1() - } else { - builder.build_v2_data() - } - }; + let mut writer = self.new_manifest_writer( + &ManifestContentType::Data, + self.tx.current_table.metadata().default_partition_spec_id(), + )?; for entry in manifest_entries { writer.add_entry(entry)?; } @@ -228,9 +264,10 @@ impl<'a> SnapshotProduceAction<'a> { } // Returns a `Summary` of the current snapshot - fn summary( + fn summary( &self, snapshot_produce_operation: &OP, + manifest_process: &MP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); let table_metadata = self.tx.current_table.metadata_ref(); @@ -266,6 +303,10 @@ impl<'a> SnapshotProduceAction<'a> { let mut additional_properties = summary_collector.build(); additional_properties.extend(self.snapshot_properties.clone()); + // Included the summary from the snapshot produce operation and manifest process. + additional_properties.extend(snapshot_produce_operation.summary()); + additional_properties.extend(manifest_process.summary()); + let summary = Summary { operation: snapshot_produce_operation.operation(), additional_properties, @@ -302,7 +343,7 @@ impl<'a> SnapshotProduceAction<'a> { let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); let summary = self - .summary(&snapshot_produce_operation) + .summary(&snapshot_produce_operation, &process) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") .with_source(err) diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..e27a9ab3bc 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -29,6 +29,7 @@ mod conflict_commit_test; mod datafusion; mod read_evolved_schema; mod read_positional_deletes; +mod rewrite_manifest_test; mod scan_all_type; pub async fn random_ns() -> Namespace { diff --git a/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs b/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs new file mode 100644 index 0000000000..dbc4b7b4bc --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs @@ -0,0 +1,679 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::FutureExt; +use iceberg::io::FileIO; +use iceberg::spec::{ + DataFile, ManifestEntry, ManifestFile, ManifestStatus, ManifestWriterBuilder, SnapshotRef, +}; +use iceberg::table::Table; +use iceberg::transaction::{ + Transaction, CREATED_MANIFESTS_COUNT, KEPT_MANIFESTS_COUNT, PROCESSED_ENTRY_COUNT, + REPLACED_MANIFESTS_COUNT, +}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::{random_ns, test_schema}; + +async fn generate_data_file(table: &Table, name: &str) -> Vec { + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + name.to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 1); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + data_file_writer.close().await.unwrap() +} + +async fn write_v2_data_manifest_file( + table: &Table, + file_name: &str, + entry: impl IntoIterator, +) -> ManifestFile { + let manifest_file_writer = ManifestWriterBuilder::new( + table + .file_io() + .new_output(format!( + "{}/meta/{}", + table.metadata().location(), + file_name + )) + .unwrap(), + None, + vec![], + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), + ); + let mut writer = manifest_file_writer.build_v2_data(); + for entry in entry { + writer + .add_existing_file( + entry.data_file().clone(), + entry.snapshot_id().unwrap(), + entry.sequence_number().unwrap(), + entry.file_sequence_number(), + ) + .unwrap(); + } + writer.write_manifest_file().await.unwrap() +} + +fn validate_summary( + snapshot: &SnapshotRef, + replaced: usize, + kept: usize, + created: usize, + entry_count: usize, +) { + let summary = snapshot.summary(); + assert_eq!( + summary + .additional_properties + .get(REPLACED_MANIFESTS_COUNT) + .unwrap(), + &replaced.to_string() + ); + assert_eq!( + summary + .additional_properties + .get(KEPT_MANIFESTS_COUNT) + .unwrap(), + &kept.to_string() + ); + assert_eq!( + summary + .additional_properties + .get(CREATED_MANIFESTS_COUNT) + .unwrap(), + &created.to_string() + ); + assert_eq!( + summary + .additional_properties + .get(PROCESSED_ENTRY_COUNT) + .unwrap(), + &entry_count.to_string() + ); +} + +async fn validate_manifest_entry( + manifest: &ManifestFile, + file_io: &FileIO, + snapshot_id: impl IntoIterator, + expected_files: impl IntoIterator, + expected_statuses: impl IntoIterator, +) { + let mut expect_snapshot_id = snapshot_id.into_iter(); + let mut expected_files = expected_files.into_iter(); + let mut expected_statuses = expected_statuses.into_iter(); + let manifest = manifest.load_manifest(file_io).await.unwrap(); + for e in manifest.entries() { + assert_eq!(e.file_path(), expected_files.next().unwrap().file_path()); + assert_eq!(e.snapshot_id().unwrap(), expect_snapshot_id.next().unwrap()); + assert_eq!(e.status(), expected_statuses.next().unwrap()); + } +} + +#[tokio::test] +async fn test_rewrite_manifest_append_directly() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + // Create table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Append a data file + let data_file = generate_data_file(&table, "d1").await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + let snapshot_id = table.metadata().current_snapshot_id().unwrap(); + + // Rewrite manifest diretcly + let tx = Transaction::new(&table); + let rewrite_action = tx + .rewrite_manifest( + Some(Box::new(|_data_file| "".to_string())), + None, + None, + None, + vec![], + ) + .unwrap(); + let table = rewrite_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + + // Check result + assert!(table.metadata().current_snapshot_id().unwrap() != snapshot_id); + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert!(manifest_list.entries().len() == 1); + validate_manifest_entry( + &manifest_list.entries()[0], + table.file_io(), + vec![snapshot_id], + data_file, + vec![ManifestStatus::Existing], + ) + .await; +} + +#[tokio::test] +async fn test_rewrite_manifest_append_directly_combine() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + // Create table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Append data file twice to generate two manifset file + for i in 0..2 { + let data_file = generate_data_file(&table, &i.to_string()).await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + } + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 2); + let expect_entries = { + let mut entries = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1); + entries.push(manifest.entries()[0].clone()); + } + entries + }; + + // Rewrite manifest diretcly + let tx = Transaction::new(&table); + let rewrite_action = tx + .rewrite_manifest( + Some(Box::new(|_data_file| "".to_string())), + None, + None, + None, + vec![], + ) + .unwrap(); + let table = rewrite_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + + // Check result + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert!(manifest_list.entries().len() == 1); + let manifest_file = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert!(manifest_file.entries().len() == 2); + for entry in manifest_file.entries() { + expect_entries + .iter() + .find(|e| e.file_path() == entry.file_path()) + .unwrap(); + expect_entries + .iter() + .find(|e| e.snapshot_id() == entry.snapshot_id()) + .unwrap(); + assert_eq!(entry.status(), ManifestStatus::Existing); + } +} + +#[tokio::test] +async fn test_rewrite_manifest_separate() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + // Create table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Append two data file + let data_file1 = generate_data_file(&table, "d1").await; + let data_file2 = generate_data_file(&table, "d2").await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action + .add_data_files(data_file1.into_iter().chain(data_file2.into_iter())) + .unwrap(); + table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 1); + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(manifest.entries().len(), 2); + let (expect_entries, _) = manifest.into_parts(); + + // Rewrite manifest to separate two data file + let tx = Transaction::new(&table); + let rewrite_action = tx + .rewrite_manifest( + Some(Box::new(|data_file| data_file.file_path().to_string())), + None, + None, + None, + vec![], + ) + .unwrap(); + let table = rewrite_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + + // Check result + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 2); + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1); + let entry = manifest.entries()[0].clone(); + expect_entries + .iter() + .find(|e| e.file_path() == entry.file_path()) + .unwrap(); + expect_entries + .iter() + .find(|e| e.snapshot_id() == entry.snapshot_id()) + .unwrap(); + assert_eq!(entry.status(), ManifestStatus::Existing); + } +} + +#[tokio::test] +async fn test_rewrite_manifest_filter() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + // Create table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + for i in 1..4 { + let data_file = generate_data_file(&table, &format!("d{}", i)).await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + } + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + let mut expect_entries = vec![]; + let mut expect_entry3 = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1); + if manifest.entries()[0].file_path().contains("d3") { + expect_entry3.push(manifest.entries()[0].clone()); + } else { + expect_entries.push(manifest.entries()[0].clone()); + } + } + assert_eq!(expect_entry3.len(), 1); + assert_eq!(expect_entries.len(), 2); + + // Rewrite manifest to separate two data file + let tx = Transaction::new(&table); + let file_io_clone = table.file_io().clone(); + let rewrite_action = tx + .rewrite_manifest( + Some(Box::new(|data_file| data_file.file_path().to_string())), + Some(Box::new(move |entry| { + let entry_clone = entry.clone(); + let file_io = file_io_clone.clone(); + async move { + let manifest = entry_clone.load_manifest(&file_io).await.unwrap(); + !manifest + .entries() + .iter() + .any(|e| e.file_path().contains("d3")) + } + .boxed() + })), + None, + None, + vec![], + ) + .unwrap(); + let table = rewrite_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + + // Check result + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 2); + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1 || manifest.entries().len() == 2); + if manifest.entries().len() == 1 { + let entry = manifest.entries()[0].clone(); + assert_eq!(entry.file_path(), expect_entry3[0].file_path()); + assert_eq!(entry.snapshot_id(), expect_entry3[0].snapshot_id()); + assert_eq!(entry.status(), ManifestStatus::Existing); + } else { + for entry in manifest.entries() { + expect_entries + .iter() + .find(|e| e.file_path() == entry.file_path()) + .unwrap(); + expect_entries + .iter() + .find(|e| e.snapshot_id() == entry.snapshot_id()) + .unwrap(); + assert_eq!(entry.status(), ManifestStatus::Existing); + } + } + } +} + +// # TODO +// Test rewrite manifest target size +// # TODO +// Test concurrent append with rewrite manifest + +#[tokio::test] +async fn test_basic_manifest_replacement() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + // Create table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Append twice and each time append two data file + let data_file1 = generate_data_file(&table, "d1-1").await; + let data_file2 = generate_data_file(&table, "d1-2").await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action + .add_data_files(data_file1.iter().chain(data_file2.iter()).cloned()) + .unwrap(); + table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + let data_file3 = generate_data_file(&table, "d2-1").await; + let data_file4 = generate_data_file(&table, "d2-2").await; + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action + .add_data_files(data_file3.iter().chain(data_file4.iter()).cloned()) + .unwrap(); + table = append_action + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 2); + assert_eq!(manifest_list.entries()[0].added_files_count, Some(2)); + assert_eq!(manifest_list.entries()[1].added_files_count, Some(2)); + + // Separate the first manifest file into two manifest files manually + let first_snapshot_id = manifest_list.entries()[1].added_snapshot_id; + let second_snapshot_id = manifest_list.entries()[0].added_snapshot_id; + let delete_manifest = manifest_list.entries()[1].clone(); + let added_manifest1 = + write_v2_data_manifest_file(&table, "m1-1", vec![ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(first_snapshot_id) + .sequence_number(0) + .file_sequence_number(0) + .data_file(data_file1[0].clone()) + .build()]) + .await; + let added_manifest2 = + write_v2_data_manifest_file(&table, "m1-2", vec![ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(first_snapshot_id) + .sequence_number(0) + .file_sequence_number(0) + .data_file(data_file2[0].clone()) + .build()]) + .await; + let mut rewrite_manifest = Transaction::new(&table) + .rewrite_manifest::<()>(None, None, None, None, vec![]) + .unwrap(); + rewrite_manifest.delete_manifset(delete_manifest).unwrap(); + rewrite_manifest.add_manifest(added_manifest1).unwrap(); + rewrite_manifest.add_manifest(added_manifest2).unwrap(); + let table = rewrite_manifest + .apply() + .await + .unwrap() + .commit(&rest_catalog) + .await + .unwrap(); + + // Check result + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + assert!(manifest_list.entries()[0].manifest_path.contains("m1-1")); + assert!(manifest_list.entries()[1].manifest_path.contains("m1-2")); + validate_summary(table.metadata().current_snapshot().unwrap(), 1, 1, 2, 0); + println!("{:?}", manifest_list.entries()); + validate_manifest_entry( + &manifest_list.entries()[0], + table.file_io(), + vec![first_snapshot_id], + data_file1, + vec![ManifestStatus::Existing], + ) + .await; + validate_manifest_entry( + &manifest_list.entries()[1], + table.file_io(), + vec![first_snapshot_id], + data_file2, + vec![ManifestStatus::Existing], + ) + .await; + validate_manifest_entry( + &manifest_list.entries()[2], + table.file_io(), + vec![second_snapshot_id, second_snapshot_id], + data_file3.iter().chain(data_file4.iter()).cloned(), + vec![ManifestStatus::Added; 2], + ) + .await; +} From 8a59c6580fbbd66fd60dae5c05532421f5a73615 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 28 Apr 2025 22:42:48 +0800 Subject: [PATCH 2/2] fix test and refine code --- crates/iceberg/src/spec/snapshot_summary.rs | 11 ------ crates/iceberg/src/transaction/append.rs | 2 +- crates/iceberg/src/transaction/mod.rs | 6 ++-- .../src/transaction/rewrite_manifest.rs | 34 +++++++++---------- crates/iceberg/src/transaction/snapshot.rs | 22 +++++++----- .../shared_tests/rewrite_manifest_test.rs | 14 +++++--- 6 files changed, 42 insertions(+), 47 deletions(-) diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index 05f9fb8e63..f54209b017 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -335,17 +335,6 @@ pub(crate) fn update_snapshot_summaries( previous_summary: Option<&Summary>, truncate_full_table: bool, ) -> Result { - // Validate that the operation is supported - if summary.operation != Operation::Append - && summary.operation != Operation::Overwrite - && summary.operation != Operation::Delete - { - return Err(Error::new( - ErrorKind::DataInvalid, - "Operation is not supported.", - )); - } - let mut summary = match previous_summary { Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => { truncate_table_summary(summary, prev_summary) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 474b32becb..bbff8b6068 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -284,7 +284,7 @@ mod tests { new_snapshot.sequence_number() ); - // check manifset + // check manifest let manifest = manifest_list.entries()[0] .load_manifest(table.file_io()) .await diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d3ee13412c..0693b1f92e 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -31,7 +31,7 @@ use std::hash::Hash; use std::mem::discriminant; use std::sync::Arc; -use rewrite_manifest::{ClusterFunc, PredicateFunc, RewriteManifsetAction}; +use rewrite_manifest::{ClusterFunc, PredicateFunc, RewriteManifestAction}; use uuid::Uuid; use crate::error::Result; @@ -180,7 +180,7 @@ impl<'a> Transaction<'a> { snapshot_id: Option, commit_uuid: Option, key_metadata: Vec, - ) -> Result> { + ) -> Result> { let snapshot_id = if let Some(snapshot_id) = snapshot_id { if self .current_table @@ -197,7 +197,7 @@ impl<'a> Transaction<'a> { } else { self.generate_unique_snapshot_id() }; - RewriteManifsetAction::new( + RewriteManifestAction::new( self, cluster_by_func, manifest_predicate, diff --git a/crates/iceberg/src/transaction/rewrite_manifest.rs b/crates/iceberg/src/transaction/rewrite_manifest.rs index b21c9986f6..0465e736d0 100644 --- a/crates/iceberg/src/transaction/rewrite_manifest.rs +++ b/crates/iceberg/src/transaction/rewrite_manifest.rs @@ -45,10 +45,10 @@ pub type ClusterFunc = Box T>; pub type PredicateFunc = Box Pin + Send>>>; /// Action used for rewriting manifests for a table. -pub struct RewriteManifsetAction<'a, T> { +pub struct RewriteManifestAction<'a, T> { cluster_by_func: Option>, - manifset_predicate: Option, - manifset_writers: HashMap<(T, i32), ManifestWriter>, + manifest_predicate: Option, + manifest_writers: HashMap<(T, i32), ManifestWriter>, // Manifest file that user added to the snapshot added_manifests: Vec, @@ -68,7 +68,7 @@ pub struct RewriteManifsetAction<'a, T> { process_entry_count: usize, } -impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { +impl<'a, T: Hash + Eq> RewriteManifestAction<'a, T> { pub(crate) fn new( tx: Transaction<'a>, cluster_by_func: Option>, @@ -80,8 +80,8 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { ) -> Result { Ok(Self { cluster_by_func, - manifset_predicate: manifest_predicate, - manifset_writers: HashMap::new(), + manifest_predicate, + manifest_writers: HashMap::new(), added_manifests: vec![], deleted_manifests: vec![], new_manifests: vec![], @@ -98,7 +98,7 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { }) } - /// Add the manifset file for new snapshot + /// Add the manifest file for new snapshot pub fn add_manifest(&mut self, manifest: ManifestFile) -> Result<()> { if manifest.has_added_files() { return Err(Error::new( @@ -139,9 +139,9 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { Ok(()) } - /// Delete the manifset file from snapshot - pub fn delete_manifset(&mut self, manifset: ManifestFile) -> Result<()> { - self.deleted_manifests.push(manifset); + /// Delete the manifest file from snapshot + pub fn delete_manifest(&mut self, manifest: ManifestFile) -> Result<()> { + self.deleted_manifests.push(manifest); Ok(()) } @@ -152,7 +152,7 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { #[inline] async fn if_rewrite(&self, manifest: &ManifestFile) -> bool { // Always rewrite if not predicate is provided - if let Some(predicate) = self.manifset_predicate.as_ref() { + if let Some(predicate) = self.manifest_predicate.as_ref() { predicate(manifest).await } else { true @@ -174,8 +174,6 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { .load_manifest(self.snapshot_produce_action.tx.current_table.file_io()) .await? .into_parts(); - // # TODO - // If we ignore delete entry here, how to clean the file? for entry in manifest_entries.into_iter().filter(|e| e.is_alive()) { let key = ( self.cluster_by_func @@ -185,7 +183,7 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { ), manifest.partition_spec_id, ); - match self.manifset_writers.entry(key) { + match self.manifest_writers.entry(key) { Entry::Occupied(mut e) => { // # TODO // Close when file reach target size and reset the writer @@ -205,7 +203,7 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { } } // write all manifest files - for (_, writer) in self.manifset_writers.drain() { + for (_, writer) in self.manifest_writers.drain() { let manifest_file = writer.write_manifest_file().await?; self.new_manifests.push(manifest_file); } @@ -349,7 +347,7 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { self.snapshot_produce_action .apply( - RewriteManifsetActionOperation { + RewriteManifestActionOperation { existing_manifests, summary, }, @@ -359,12 +357,12 @@ impl<'a, T: Hash + Eq> RewriteManifsetAction<'a, T> { } } -struct RewriteManifsetActionOperation { +struct RewriteManifestActionOperation { existing_manifests: Vec, summary: HashMap, } -impl SnapshotProduceOperation for RewriteManifsetActionOperation { +impl SnapshotProduceOperation for RewriteManifestActionOperation { fn operation(&self) -> Operation { Operation::Replace } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index ab0fcb6353..05385dea21 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::future::Future; +use std::mem; use std::ops::RangeFrom; use uuid::Uuid; @@ -221,8 +222,10 @@ impl<'a> SnapshotProduceAction<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + added_data_files: Vec, + ) -> Result { let snapshot_id = self.snapshot_id; let format_version = self.tx.current_table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { @@ -252,13 +255,15 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + let mut manifest_files = vec![]; + if !self.added_data_files.is_empty() { + let added_data_files = mem::take(&mut self.added_data_files); + let added_manifest = self.write_added_manifest(added_data_files).await?; + manifest_files.push(added_manifest); + } // # TODO // Support process delete entries. - - let mut manifest_files = vec![added_manifest]; - manifest_files.extend(existing_manifests); + manifest_files.extend(snapshot_produce_operation.existing_manifest(self).await?); let manifest_files = manifest_process.process_manifeset(manifest_files); Ok(manifest_files) } @@ -347,8 +352,7 @@ impl<'a> SnapshotProduceAction<'a> { .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") .with_source(err) - }) - .unwrap(); + })?; let manifest_list_path = self.generate_manifest_list_file_path(0); diff --git a/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs b/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs index dbc4b7b4bc..1940f36317 100644 --- a/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs +++ b/crates/integration_tests/tests/shared_tests/rewrite_manifest_test.rs @@ -65,7 +65,11 @@ async fn generate_data_file(table: &Table, name: &str) -> Vec { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 1); + let data_file_writer_builder = DataFileWriterBuilder::new( + parquet_writer_builder, + None, + table.metadata().default_partition_spec_id(), + ); let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); @@ -256,7 +260,7 @@ async fn test_rewrite_manifest_append_directly_combine() { .await .unwrap(); - // Append data file twice to generate two manifset file + // Append data file twice to generate two manifest file for i in 0..2 { let data_file = generate_data_file(&table, &i.to_string()).await; let tx = Transaction::new(&table); @@ -482,7 +486,7 @@ async fn test_rewrite_manifest_filter() { let file_io_clone = table.file_io().clone(); let rewrite_action = tx .rewrite_manifest( - Some(Box::new(|data_file| data_file.file_path().to_string())), + Some(Box::new(|_data_file| "file".to_string())), Some(Box::new(move |entry| { let entry_clone = entry.clone(); let file_io = file_io_clone.clone(); @@ -524,7 +528,7 @@ async fn test_rewrite_manifest_filter() { let entry = manifest.entries()[0].clone(); assert_eq!(entry.file_path(), expect_entry3[0].file_path()); assert_eq!(entry.snapshot_id(), expect_entry3[0].snapshot_id()); - assert_eq!(entry.status(), ManifestStatus::Existing); + assert_eq!(entry.status(), ManifestStatus::Added); } else { for entry in manifest.entries() { expect_entries @@ -628,7 +632,7 @@ async fn test_basic_manifest_replacement() { let mut rewrite_manifest = Transaction::new(&table) .rewrite_manifest::<()>(None, None, None, None, vec![]) .unwrap(); - rewrite_manifest.delete_manifset(delete_manifest).unwrap(); + rewrite_manifest.delete_manifest(delete_manifest).unwrap(); rewrite_manifest.add_manifest(added_manifest1).unwrap(); rewrite_manifest.add_manifest(added_manifest2).unwrap(); let table = rewrite_manifest