From 6042e41c6ea46d23dc4898ea3b94f712e58d3499 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 14 Jul 2025 22:19:29 +0800 Subject: [PATCH 1/4] add merge append action --- crates/iceberg/src/spec/manifest/entry.rs | 6 + crates/iceberg/src/transaction/append.rs | 2 +- .../iceberg/src/transaction/merge_append.rs | 332 ++++++++++++++++++ crates/iceberg/src/transaction/mod.rs | 10 + crates/iceberg/src/transaction/snapshot.rs | 24 +- crates/iceberg/src/utils.rs | 145 ++++++++ .../tests/shared_tests/merge_append_test.rs | 210 +++++++++++ .../tests/shared_tests/mod.rs | 1 + 8 files changed, 718 insertions(+), 12 deletions(-) create mode 100644 crates/iceberg/src/transaction/merge_append.rs create mode 100644 crates/integration_tests/tests/shared_tests/merge_append_test.rs diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index 7ba9efb3b9..51b667f022 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -130,6 +130,12 @@ impl ManifestEntry { self.snapshot_id } + /// File sequence number + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..f96d9f73f2 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -108,7 +108,7 @@ impl TransactionAction for FastAppendAction { } } -struct FastAppendOperation; +pub(crate) struct FastAppendOperation; impl SnapshotProduceOperation for FastAppendOperation { fn operation(&self) -> Operation { diff --git a/crates/iceberg/src/transaction/merge_append.rs b/crates/iceberg/src/transaction/merge_append.rs new file mode 100644 index 0000000000..19f7fa0530 --- /dev/null +++ b/crates/iceberg/src/transaction/merge_append.rs @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::Result; +use crate::io::FileIO; +use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter}; +use crate::table::Table; +use crate::transaction::append::FastAppendOperation; +use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::utils::bin::ListPacker; + +/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests +/// based on the target size. +pub struct MergeAppendAction { + target_size_bytes: u32, + min_count_to_merge: u32, + check_duplicate: bool, + merge_enabled: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, +} + +/// Target size of manifest file when merging manifests. +pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes"; +const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB +/// Minimum number of manifests to merge. +pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge"; +const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100; +/// Whether allow to merge manifests. +pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled"; +const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; + +impl MergeAppendAction { + pub(crate) fn new(table: &Table) -> Result { + let target_size_bytes: u32 = table + .metadata() + .properties() + .get(MANIFEST_TARGET_SIZE_BYTES) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT); + let min_count_to_merge: u32 = table + .metadata() + .properties() + .get(MANIFEST_MIN_MERGE_COUNT) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT); + let merge_enabled = table + .metadata() + .properties() + .get(MANIFEST_MERGE_ENABLED) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT); + Ok(Self { + check_duplicate: true, + target_size_bytes, + min_count_to_merge, + merge_enabled, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + }) + } + + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + self.added_data_files.extend(data_files); + Ok(self) + } +} + +#[async_trait] +impl TransactionAction for MergeAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + // validate added files + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + + // Checks duplicate files + if self.check_duplicate { + snapshot_producer + .validate_duplicate_files(&self.added_data_files) + .await?; + } + + if self.merge_enabled { + snapshot_producer + .commit(FastAppendOperation, MergeManifsetProcess { + target_size_bytes: self.target_size_bytes, + min_count_to_merge: self.min_count_to_merge, + }) + .await + } else { + snapshot_producer + .commit(FastAppendOperation, DefaultManifestProcess) + .await + } + } +} + +struct MergeManifestManager { + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, +} + +impl MergeManifestManager { + pub fn new( + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, + ) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + content, + } + } + + fn group_by_spec(&self, manifests: Vec) -> BTreeMap> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec, + mut writer: ManifestWriter, + ) -> Result { + for manifest_file in manifest_bin { + let manifest_file = manifest_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + first_manifest: &ManifestFile, + group_manifests: Vec, + ) -> Result> { + let packer: ListPacker = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer(self.content)?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin>> + Send>>) + } + }) + .collect::>> + Send>>>>>()?; + + let merged_bins: Vec> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + async fn merge_manifest( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +struct MergeManifsetProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl ManifestProcess for MergeManifsetProcess { + async fn process_manifests( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + let (unmerg_data_manifests, unmerge_delete_manifest) = manifests + .into_iter() + .partition(|m| m.content == ManifestContentType::Data); + let mut data_manifests = { + let merge_manifest_manager = MergeManifestManager::new( + self.target_size_bytes, + self.min_count_to_merge, + ManifestContentType::Data, + ); + merge_manifest_manager + .merge_manifest(snapshot_produce, unmerg_data_manifests) + .await? + }; + data_manifests.extend(unmerge_delete_manifest); + Ok(data_manifests) + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..170e2f1815 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -56,6 +56,7 @@ use std::collections::HashMap; pub use action::*; mod append; +mod merge_append; mod snapshot; mod sort_order; mod update_location; @@ -78,6 +79,10 @@ use crate::spec::{ use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::merge_append::MergeAppendAction; +pub use crate::transaction::merge_append::{ + MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES, +}; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -148,6 +153,11 @@ impl Transaction { FastAppendAction::new() } + /// Creates a merge append action. + pub fn merge_append(&self) -> Result { + MergeAppendAction::new(&self.table) + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..a2562d19a8 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -51,26 +51,26 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { pub(crate) struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifests( + async fn process_manifests( &self, - _snapshot_produce: &SnapshotProducer<'_>, + _snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec { - manifests + ) -> Result> { + Ok(manifests) } } pub(crate) trait ManifestProcess: Send + Sync { - fn process_manifests( + async fn process_manifests( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec; + ) -> Result>; } pub(crate) struct SnapshotProducer<'a> { pub(crate) table: &'a Table, - snapshot_id: i64, + pub(crate) snapshot_id: i64, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, @@ -186,7 +186,10 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub(crate) fn new_manifest_writer( + &mut self, + content: ManifestContentType, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -308,8 +311,7 @@ impl<'a> SnapshotProducer<'a> { // # TODO // Support process delete entries. - let manifest_files = manifest_process.process_manifests(self, manifest_files); - Ok(manifest_files) + manifest_process.process_manifests(self, manifest_files).await } // Returns a `Summary` of the current snapshot diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/utils.rs index 00d3e69bd3..1a60c82a77 100644 --- a/crates/iceberg/src/utils.rs +++ b/crates/iceberg/src/utils.rs @@ -40,3 +40,148 @@ pub(crate) fn available_parallelism() -> NonZeroUsize { NonZeroUsize::new(DEFAULT_PARALLELISM).unwrap() }) } + +pub mod bin { + use std::iter::Iterator; + use std::marker::PhantomData; + + use itertools::Itertools; + + struct Bin { + bin_weight: u32, + target_weight: u32, + items: Vec, + } + + impl Bin { + pub fn new(target_weight: u32) -> Self { + Bin { + bin_weight: 0, + target_weight, + items: Vec::new(), + } + } + + pub fn can_add(&self, weight: u32) -> bool { + self.bin_weight + weight <= self.target_weight + } + + pub fn add(&mut self, item: T, weight: u32) { + self.bin_weight += weight; + self.items.push(item); + } + + pub fn into_vec(self) -> Vec { + self.items + } + } + + /// ListPacker help to pack item into bin of item. Each bin has close to + /// target_weight. + pub(crate) struct ListPacker { + target_weight: u32, + _marker: PhantomData, + } + + impl ListPacker { + pub fn new(target_weight: u32) -> Self { + ListPacker { + target_weight, + _marker: PhantomData, + } + } + + pub fn pack(&self, items: Vec, weight_func: F) -> Vec> + where F: Fn(&T) -> u32 { + let mut bins: Vec> = vec![]; + for item in items { + let cur_weight = weight_func(&item); + let addable_bin = + if let Some(bin) = bins.iter_mut().find(|bin| bin.can_add(cur_weight)) { + bin + } else { + bins.push(Bin::new(self.target_weight)); + bins.last_mut().unwrap() + }; + addable_bin.add(item, cur_weight); + } + + bins.into_iter().map(|bin| bin.into_vec()).collect_vec() + } + } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn test_list_packer_basic_packing() { + let packer = ListPacker::new(10); + let items = vec![3, 4, 5, 6, 2, 1]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 3); + assert!(packed[0].iter().sum::() == 10); + assert!(packed[1].iter().sum::() == 5); + assert!(packed[2].iter().sum::() == 6); + } + + #[test] + fn test_list_packer_with_complex_items() { + #[derive(Debug, PartialEq)] + struct Item { + name: String, + size: u32, + } + + let packer = ListPacker::new(15); + let items = vec![ + Item { + name: "A".to_string(), + size: 7, + }, + Item { + name: "B".to_string(), + size: 8, + }, + Item { + name: "C".to_string(), + size: 5, + }, + Item { + name: "D".to_string(), + size: 6, + }, + ]; + + let packed = packer.pack(items, |item| item.size); + + assert_eq!(packed.len(), 2); + assert!(packed[0].iter().map(|x| x.size).sum::() <= 15); + assert!(packed[1].iter().map(|x| x.size).sum::() <= 15); + } + + #[test] + fn test_list_packer_single_large_item() { + let packer = ListPacker::new(10); + let items = vec![15, 5, 3]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 2); + assert!(packed[0].contains(&15)); + assert!(packed[1].iter().sum::() <= 10); + } + + #[test] + fn test_list_packer_empty_input() { + let packer = ListPacker::new(10); + let items: Vec = vec![]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 0); + } + } +} diff --git a/crates/integration_tests/tests/shared_tests/merge_append_test.rs b/crates/integration_tests/tests/shared_tests/merge_append_test.rs new file mode 100644 index 0000000000..366a018d0c --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/merge_append_test.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::{Arc, OnceLock}; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use iceberg::spec::{ + DataFile, ManifestEntry, ManifestStatus, NestedField, PrimitiveType, Schema, Type, +}; +use iceberg::table::Table; +use iceberg::transaction::{ + ApplyTransactionAction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, + MANIFEST_TARGET_SIZE_BYTES, Transaction, +}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::random_ns; + +static FILE_NAME_GENERATOR: OnceLock = OnceLock::new(); + +async fn write_new_data_file(table: &Table) -> Vec { + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = FILE_NAME_GENERATOR.get_or_init(|| { + DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ) + }); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new( + parquet_writer_builder, + None, + table.metadata().default_partition_spec_id(), + ); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"); 100]); + let col2 = Int32Array::from(vec![Some(1); 100]); + let col3 = BooleanArray::from(vec![Some(true); 100]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + data_file_writer.close().await.unwrap() +} + +#[tokio::test] +async fn test_append_data_file() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Enable merge append for table + let tx = Transaction::new(&table); + let update_properties_action = tx + .update_table_properties() + .set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()) + .set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()) + .set(MANIFEST_TARGET_SIZE_BYTES.to_string(), "8000".to_string()); + let tx = update_properties_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + + // fast append data file 3 time to create 3 manifest + let mut original_manifest_entries = vec![]; + for _ in 0..3 { + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap() + } + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + + // construct test data + for (idx, entry) in manifest_list.entries().iter().enumerate() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1); + + // For this first manifest, it will be pack with the first additional manifest and + // the count(2) is less than the min merge count(4), so these two will not merge. + // See detail: `MergeManifestProcess::merge_group` + if idx == 0 { + original_manifest_entries.push(Arc::new( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(manifest.entries()[0].snapshot_id().unwrap()) + .sequence_number(manifest.entries()[0].sequence_number().unwrap()) + .file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap()) + .data_file(manifest.entries()[0].data_file().clone()) + .build(), + )); + } else { + original_manifest_entries.push(Arc::new( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(manifest.entries()[0].snapshot_id().unwrap()) + .sequence_number(manifest.entries()[0].sequence_number().unwrap()) + .file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap()) + .data_file(manifest.entries()[0].data_file().clone()) + .build(), + )); + } + } + + // append data file with merge append, 4 data file will be merged to two manifest + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let mut merge_append_action = tx.merge_append().unwrap(); + merge_append_action + .add_data_files(data_file.clone()) + .unwrap(); + let tx = merge_append_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + // Check manifest file + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + { + let manifest = manifest_list.entries()[1] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert!(manifest.entries().len() == 1); + original_manifest_entries.retain(|entry| !manifest.entries().contains(entry)); + assert!(original_manifest_entries.len() == 2); + } + { + let manifest = manifest_list.entries()[2] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert!(manifest.entries().len() == 2); + for original_entry in original_manifest_entries.iter() { + assert!(manifest.entries().contains(original_entry)); + } + } +} diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..e1836e8590 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -27,6 +27,7 @@ mod append_data_file_test; mod append_partition_data_file_test; mod conflict_commit_test; mod datafusion; +mod merge_append_test; mod read_evolved_schema; mod read_positional_deletes; mod scan_all_type; From 67918fb073b994bf37f8ae652531ee3be41dda10 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 20 Jul 2025 00:23:44 +0800 Subject: [PATCH 2/4] - add parse_property to refine parse property logic - rename FastAppendOperation to AppendOperation - use iterable input for pack to be more memory efficient --- crates/iceberg/src/transaction/append.rs | 6 +-- .../iceberg/src/transaction/merge_append.rs | 41 ++++++++--------- crates/iceberg/src/utils.rs | 44 +++++++++++++++---- 3 files changed, 57 insertions(+), 34 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f96d9f73f2..89faa6f406 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -103,14 +103,14 @@ impl TransactionAction for FastAppendAction { } snapshot_producer - .commit(FastAppendOperation, DefaultManifestProcess) + .commit(AppendOperation, DefaultManifestProcess) .await } } -pub(crate) struct FastAppendOperation; +pub(crate) struct AppendOperation; -impl SnapshotProduceOperation for FastAppendOperation { +impl SnapshotProduceOperation for AppendOperation { fn operation(&self) -> Operation { Operation::Append } diff --git a/crates/iceberg/src/transaction/merge_append.rs b/crates/iceberg/src/transaction/merge_append.rs index 19f7fa0530..e6c671eee4 100644 --- a/crates/iceberg/src/transaction/merge_append.rs +++ b/crates/iceberg/src/transaction/merge_append.rs @@ -26,10 +26,11 @@ use crate::Result; use crate::io::FileIO; use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter}; use crate::table::Table; -use crate::transaction::append::FastAppendOperation; +use crate::transaction::append::AppendOperation; use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer}; use crate::transaction::{ActionCommit, TransactionAction}; use crate::utils::bin::ListPacker; +use crate::utils::parse_property; /// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests /// based on the target size. @@ -57,24 +58,21 @@ const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; impl MergeAppendAction { pub(crate) fn new(table: &Table) -> Result { - let target_size_bytes: u32 = table - .metadata() - .properties() - .get(MANIFEST_TARGET_SIZE_BYTES) - .and_then(|s| s.parse().ok()) - .unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - let min_count_to_merge: u32 = table - .metadata() - .properties() - .get(MANIFEST_MIN_MERGE_COUNT) - .and_then(|s| s.parse().ok()) - .unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT); - let merge_enabled = table - .metadata() - .properties() - .get(MANIFEST_MERGE_ENABLED) - .and_then(|s| s.parse().ok()) - .unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT); + let target_size_bytes: u32 = parse_property( + table.metadata().properties(), + MANIFEST_TARGET_SIZE_BYTES, + MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + )?; + let min_count_to_merge: u32 = parse_property( + table.metadata().properties(), + MANIFEST_MIN_MERGE_COUNT, + MANIFEST_MIN_MERGE_COUNT_DEFAULT, + )?; + let merge_enabled = parse_property( + table.metadata().properties(), + MANIFEST_MERGE_ENABLED, + MANIFEST_MERGE_ENABLED_DEFAULT, + )?; Ok(Self { check_duplicate: true, target_size_bytes, @@ -140,14 +138,14 @@ impl TransactionAction for MergeAppendAction { if self.merge_enabled { snapshot_producer - .commit(FastAppendOperation, MergeManifsetProcess { + .commit(AppendOperation, MergeManifsetProcess { target_size_bytes: self.target_size_bytes, min_count_to_merge: self.min_count_to_merge, }) .await } else { snapshot_producer - .commit(FastAppendOperation, DefaultManifestProcess) + .commit(AppendOperation, DefaultManifestProcess) .await } } @@ -228,7 +226,6 @@ impl MergeManifestManager { packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); let manifest_merge_futures = manifest_bins - .into_iter() .map(|manifest_bin| { if manifest_bin.len() == 1 { Ok(Box::pin(async { Ok(manifest_bin) }) diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/utils.rs index 1a60c82a77..663adae131 100644 --- a/crates/iceberg/src/utils.rs +++ b/crates/iceberg/src/utils.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::num::NonZeroUsize; +use std::str::FromStr; + +use crate::{Error, ErrorKind, Result}; // Use a default value of 1 as the safest option. // See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations @@ -41,12 +45,31 @@ pub(crate) fn available_parallelism() -> NonZeroUsize { }) } +pub(crate) fn parse_property( + properties: &HashMap, + key: &str, + default_value: T, +) -> Result +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + match properties.get(key) { + Some(s) => s.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid property value for key: {}", key), + ) + .with_source(e) + }), + None => Ok(default_value), + } +} + pub mod bin { use std::iter::Iterator; use std::marker::PhantomData; - use itertools::Itertools; - struct Bin { bin_weight: u32, target_weight: u32, @@ -91,8 +114,11 @@ pub mod bin { } } - pub fn pack(&self, items: Vec, weight_func: F) -> Vec> - where F: Fn(&T) -> u32 { + pub fn pack(&self, items: I, weight_func: F) -> impl Iterator> + where + I: IntoIterator, + F: Fn(&T) -> u32, + { let mut bins: Vec> = vec![]; for item in items { let cur_weight = weight_func(&item); @@ -106,7 +132,7 @@ pub mod bin { addable_bin.add(item, cur_weight); } - bins.into_iter().map(|bin| bin.into_vec()).collect_vec() + bins.into_iter().map(|bin| bin.into_vec()) } } @@ -119,7 +145,7 @@ pub mod bin { let packer = ListPacker::new(10); let items = vec![3, 4, 5, 6, 2, 1]; - let packed = packer.pack(items, |&x| x); + let packed: Vec> = packer.pack(items, |&x| x).collect(); assert_eq!(packed.len(), 3); assert!(packed[0].iter().sum::() == 10); @@ -155,7 +181,7 @@ pub mod bin { }, ]; - let packed = packer.pack(items, |item| item.size); + let packed: Vec> = packer.pack(items, |item| item.size).collect(); assert_eq!(packed.len(), 2); assert!(packed[0].iter().map(|x| x.size).sum::() <= 15); @@ -167,7 +193,7 @@ pub mod bin { let packer = ListPacker::new(10); let items = vec![15, 5, 3]; - let packed = packer.pack(items, |&x| x); + let packed: Vec> = packer.pack(items, |&x| x).collect(); assert_eq!(packed.len(), 2); assert!(packed[0].contains(&15)); @@ -179,7 +205,7 @@ pub mod bin { let packer = ListPacker::new(10); let items: Vec = vec![]; - let packed = packer.pack(items, |&x| x); + let packed: Vec> = packer.pack(items, |&x| x).collect(); assert_eq!(packed.len(), 0); } From af5c4e729e95036f5a5e3cdd4940ce52b940d44d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 8 Aug 2025 21:55:47 +0800 Subject: [PATCH 3/4] make interface of merge append action consistent --- crates/iceberg/src/transaction/merge_append.rs | 7 ++----- crates/iceberg/src/transaction/snapshot.rs | 4 +++- .../tests/shared_tests/merge_append_test.rs | 5 +---- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/transaction/merge_append.rs b/crates/iceberg/src/transaction/merge_append.rs index e6c671eee4..f138855f89 100644 --- a/crates/iceberg/src/transaction/merge_append.rs +++ b/crates/iceberg/src/transaction/merge_append.rs @@ -106,12 +106,9 @@ impl MergeAppendAction { } /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { self.added_data_files.extend(data_files); - Ok(self) + self } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a2562d19a8..1720837965 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -311,7 +311,9 @@ impl<'a> SnapshotProducer<'a> { // # TODO // Support process delete entries. - manifest_process.process_manifests(self, manifest_files).await + manifest_process + .process_manifests(self, manifest_files) + .await } // Returns a `Summary` of the current snapshot diff --git a/crates/integration_tests/tests/shared_tests/merge_append_test.rs b/crates/integration_tests/tests/shared_tests/merge_append_test.rs index 366a018d0c..37e6750be2 100644 --- a/crates/integration_tests/tests/shared_tests/merge_append_test.rs +++ b/crates/integration_tests/tests/shared_tests/merge_append_test.rs @@ -173,10 +173,7 @@ async fn test_append_data_file() { // append data file with merge append, 4 data file will be merged to two manifest let data_file = write_new_data_file(&table).await; let tx = Transaction::new(&table); - let mut merge_append_action = tx.merge_append().unwrap(); - merge_append_action - .add_data_files(data_file.clone()) - .unwrap(); + let merge_append_action = tx.merge_append().unwrap().add_data_files(data_file.clone()); let tx = merge_append_action.apply(tx).unwrap(); table = tx.commit(&rest_catalog).await.unwrap(); // Check manifest file From 59e52f631cc2ed2a5e67aaa9dd8bc53270fcf288 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 14 Aug 2025 21:52:50 +0800 Subject: [PATCH 4/4] - fix test - refine comment --- .../iceberg/src/transaction/merge_append.rs | 10 +++-- crates/iceberg/src/transaction/snapshot.rs | 11 ++++-- .../tests/shared_tests/merge_append_test.rs | 38 ++++++++++--------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/transaction/merge_append.rs b/crates/iceberg/src/transaction/merge_append.rs index f138855f89..b0035bcb25 100644 --- a/crates/iceberg/src/transaction/merge_append.rs +++ b/crates/iceberg/src/transaction/merge_append.rs @@ -135,7 +135,7 @@ impl TransactionAction for MergeAppendAction { if self.merge_enabled { snapshot_producer - .commit(AppendOperation, MergeManifsetProcess { + .commit(AppendOperation, MergeManifestProcess { target_size_bytes: self.target_size_bytes, min_count_to_merge: self.min_count_to_merge, }) @@ -271,6 +271,10 @@ impl MergeManifestManager { Ok(merged_bins.into_iter().flatten().collect()) } + // Merge Algorithm: + // 1. Split manifests into groups by partition spec id. + // 2. For each group, pack manifests into bins by target size, the sum of manifest length in each bin should be less than target size. + // 3. For the bin contains the first manifest, if the number of manifests in the bin is less than min count, then don't merge it. Otherwise, merge the bin. async fn merge_manifest( &self, snapshot_produce: &mut SnapshotProducer<'_>, @@ -296,12 +300,12 @@ impl MergeManifestManager { } } -struct MergeManifsetProcess { +struct MergeManifestProcess { target_size_bytes: u32, min_count_to_merge: u32, } -impl ManifestProcess for MergeManifsetProcess { +impl ManifestProcess for MergeManifestProcess { async fn process_manifests( &self, snapshot_produce: &mut SnapshotProducer<'_>, diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 1720837965..05192240d2 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -299,14 +299,17 @@ impl<'a> SnapshotProducer<'a> { )); } - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - let mut manifest_files = existing_manifests; - - // Process added entries. + // # NOTE + // The order of manifest files is matter: + // [added_manifest, ... ] + // # TODO + // Should we use type safe way to guarantee this order? + let mut manifest_files = vec![]; if !self.added_data_files.is_empty() { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } + manifest_files.extend(snapshot_produce_operation.existing_manifest(self).await?); // # TODO // Support process delete entries. diff --git a/crates/integration_tests/tests/shared_tests/merge_append_test.rs b/crates/integration_tests/tests/shared_tests/merge_append_test.rs index 37e6750be2..e77b83c5dd 100644 --- a/crates/integration_tests/tests/shared_tests/merge_append_test.rs +++ b/crates/integration_tests/tests/shared_tests/merge_append_test.rs @@ -111,18 +111,7 @@ async fn test_append_data_file() { .await .unwrap(); - // Enable merge append for table - let tx = Transaction::new(&table); - let update_properties_action = tx - .update_table_properties() - .set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()) - .set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()) - .set(MANIFEST_TARGET_SIZE_BYTES.to_string(), "8000".to_string()); - let tx = update_properties_action.apply(tx).unwrap(); - table = tx.commit(&rest_catalog).await.unwrap(); - // fast append data file 3 time to create 3 manifest - let mut original_manifest_entries = vec![]; for _ in 0..3 { let data_file = write_new_data_file(&table).await; let tx = Transaction::new(&table); @@ -138,15 +127,31 @@ async fn test_append_data_file() { .await .unwrap(); assert_eq!(manifest_list.entries().len(), 3); + + // Set the merge size to make sure per two manifest will be packed together. + let manifest_file_len = manifest_list.entries().iter().map(|entry| entry.manifest_length).max().unwrap(); + let tx = Transaction::new(&table); + let update_properties_action = tx + .update_table_properties() + .set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()) + .set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()) + .set(MANIFEST_TARGET_SIZE_BYTES.to_string(), (manifest_file_len * 2 + 2).to_string()); + let tx = update_properties_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); - // construct test data + // Test case: + // There are 3 manifset, and we append one manifest with merge append. + // Target size is 2 * manifest_file_len + 1, so each two manifest will be packed together. + // The first bin contains the first manifest and the first additional manifest, but it's count is less than min merge count, so it will not be merged. + // Other two manifests will be merged into one manifest. + // Expect three manifest in the new snapshot: + // 1. The new manifest with new added data file. + // 2. Original manifest with one original data file. + // 3. The merged manifest with two original data files. + let mut original_manifest_entries = vec![]; for (idx, entry) in manifest_list.entries().iter().enumerate() { let manifest = entry.load_manifest(table.file_io()).await.unwrap(); assert!(manifest.entries().len() == 1); - - // For this first manifest, it will be pack with the first additional manifest and - // the count(2) is less than the min merge count(4), so these two will not merge. - // See detail: `MergeManifestProcess::merge_group` if idx == 0 { original_manifest_entries.push(Arc::new( ManifestEntry::builder() @@ -192,7 +197,6 @@ async fn test_append_data_file() { .unwrap(); assert!(manifest.entries().len() == 1); original_manifest_entries.retain(|entry| !manifest.entries().contains(entry)); - assert!(original_manifest_entries.len() == 2); } { let manifest = manifest_list.entries()[2]