diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index 7ba9efb3b9..51b667f022 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -130,6 +130,12 @@ impl ManifestEntry { self.snapshot_id } + /// File sequence number + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..89faa6f406 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -103,14 +103,14 @@ impl TransactionAction for FastAppendAction { } snapshot_producer - .commit(FastAppendOperation, DefaultManifestProcess) + .commit(AppendOperation, DefaultManifestProcess) .await } } -struct FastAppendOperation; +pub(crate) struct AppendOperation; -impl SnapshotProduceOperation for FastAppendOperation { +impl SnapshotProduceOperation for AppendOperation { fn operation(&self) -> Operation { Operation::Append } diff --git a/crates/iceberg/src/transaction/merge_append.rs b/crates/iceberg/src/transaction/merge_append.rs new file mode 100644 index 0000000000..b0035bcb25 --- /dev/null +++ b/crates/iceberg/src/transaction/merge_append.rs @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::Result; +use crate::io::FileIO; +use crate::spec::{DataFile, ManifestContentType, ManifestFile, ManifestStatus, ManifestWriter}; +use crate::table::Table; +use crate::transaction::append::AppendOperation; +use crate::transaction::snapshot::{DefaultManifestProcess, ManifestProcess, SnapshotProducer}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::utils::bin::ListPacker; +use crate::utils::parse_property; + +/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests +/// based on the target size. +pub struct MergeAppendAction { + target_size_bytes: u32, + min_count_to_merge: u32, + check_duplicate: bool, + merge_enabled: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, +} + +/// Target size of manifest file when merging manifests. +pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes"; +const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB +/// Minimum number of manifests to merge. +pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge"; +const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100; +/// Whether allow to merge manifests. +pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled"; +const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; + +impl MergeAppendAction { + pub(crate) fn new(table: &Table) -> Result { + let target_size_bytes: u32 = parse_property( + table.metadata().properties(), + MANIFEST_TARGET_SIZE_BYTES, + MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + )?; + let min_count_to_merge: u32 = parse_property( + table.metadata().properties(), + MANIFEST_MIN_MERGE_COUNT, + MANIFEST_MIN_MERGE_COUNT_DEFAULT, + )?; + let merge_enabled = parse_property( + table.metadata().properties(), + MANIFEST_MERGE_ENABLED, + MANIFEST_MERGE_ENABLED_DEFAULT, + )?; + Ok(Self { + check_duplicate: true, + target_size_bytes, + min_count_to_merge, + merge_enabled, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + }) + } + + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } +} + +#[async_trait] +impl TransactionAction for MergeAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + // validate added files + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + + // Checks duplicate files + if self.check_duplicate { + snapshot_producer + .validate_duplicate_files(&self.added_data_files) + .await?; + } + + if self.merge_enabled { + snapshot_producer + .commit(AppendOperation, MergeManifestProcess { + target_size_bytes: self.target_size_bytes, + min_count_to_merge: self.min_count_to_merge, + }) + .await + } else { + snapshot_producer + .commit(AppendOperation, DefaultManifestProcess) + .await + } + } +} + +struct MergeManifestManager { + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, +} + +impl MergeManifestManager { + pub fn new( + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, + ) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + content, + } + } + + fn group_by_spec(&self, manifests: Vec) -> BTreeMap> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec, + mut writer: ManifestWriter, + ) -> Result { + for manifest_file in manifest_bin { + let manifest_file = manifest_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + first_manifest: &ManifestFile, + group_manifests: Vec, + ) -> Result> { + let packer: ListPacker = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer(self.content)?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin>> + Send>>) + } + }) + .collect::>> + Send>>>>>()?; + + let merged_bins: Vec> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + // Merge Algorithm: + // 1. Split manifests into groups by partition spec id. + // 2. For each group, pack manifests into bins by target size, the sum of manifest length in each bin should be less than target size. + // 3. For the bin contains the first manifest, if the number of manifests in the bin is less than min count, then don't merge it. Otherwise, merge the bin. + async fn merge_manifest( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} + +struct MergeManifestProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl ManifestProcess for MergeManifestProcess { + async fn process_manifests( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + let (unmerg_data_manifests, unmerge_delete_manifest) = manifests + .into_iter() + .partition(|m| m.content == ManifestContentType::Data); + let mut data_manifests = { + let merge_manifest_manager = MergeManifestManager::new( + self.target_size_bytes, + self.min_count_to_merge, + ManifestContentType::Data, + ); + merge_manifest_manager + .merge_manifest(snapshot_produce, unmerg_data_manifests) + .await? + }; + data_manifests.extend(unmerge_delete_manifest); + Ok(data_manifests) + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..170e2f1815 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -56,6 +56,7 @@ use std::collections::HashMap; pub use action::*; mod append; +mod merge_append; mod snapshot; mod sort_order; mod update_location; @@ -78,6 +79,10 @@ use crate::spec::{ use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::merge_append::MergeAppendAction; +pub use crate::transaction::merge_append::{ + MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES, +}; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -148,6 +153,11 @@ impl Transaction { FastAppendAction::new() } + /// Creates a merge append action. + pub fn merge_append(&self) -> Result { + MergeAppendAction::new(&self.table) + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..05192240d2 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -51,26 +51,26 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { pub(crate) struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifests( + async fn process_manifests( &self, - _snapshot_produce: &SnapshotProducer<'_>, + _snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec { - manifests + ) -> Result> { + Ok(manifests) } } pub(crate) trait ManifestProcess: Send + Sync { - fn process_manifests( + async fn process_manifests( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec; + ) -> Result>; } pub(crate) struct SnapshotProducer<'a> { pub(crate) table: &'a Table, - snapshot_id: i64, + pub(crate) snapshot_id: i64, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, @@ -186,7 +186,10 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub(crate) fn new_manifest_writer( + &mut self, + content: ManifestContentType, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -296,20 +299,24 @@ impl<'a> SnapshotProducer<'a> { )); } - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - let mut manifest_files = existing_manifests; - - // Process added entries. + // # NOTE + // The order of manifest files is matter: + // [added_manifest, ... ] + // # TODO + // Should we use type safe way to guarantee this order? + let mut manifest_files = vec![]; if !self.added_data_files.is_empty() { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } + manifest_files.extend(snapshot_produce_operation.existing_manifest(self).await?); // # TODO // Support process delete entries. - let manifest_files = manifest_process.process_manifests(self, manifest_files); - Ok(manifest_files) + manifest_process + .process_manifests(self, manifest_files) + .await } // Returns a `Summary` of the current snapshot diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/utils.rs index 00d3e69bd3..663adae131 100644 --- a/crates/iceberg/src/utils.rs +++ b/crates/iceberg/src/utils.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::num::NonZeroUsize; +use std::str::FromStr; + +use crate::{Error, ErrorKind, Result}; // Use a default value of 1 as the safest option. // See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations @@ -40,3 +44,170 @@ pub(crate) fn available_parallelism() -> NonZeroUsize { NonZeroUsize::new(DEFAULT_PARALLELISM).unwrap() }) } + +pub(crate) fn parse_property( + properties: &HashMap, + key: &str, + default_value: T, +) -> Result +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + match properties.get(key) { + Some(s) => s.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid property value for key: {}", key), + ) + .with_source(e) + }), + None => Ok(default_value), + } +} + +pub mod bin { + use std::iter::Iterator; + use std::marker::PhantomData; + + struct Bin { + bin_weight: u32, + target_weight: u32, + items: Vec, + } + + impl Bin { + pub fn new(target_weight: u32) -> Self { + Bin { + bin_weight: 0, + target_weight, + items: Vec::new(), + } + } + + pub fn can_add(&self, weight: u32) -> bool { + self.bin_weight + weight <= self.target_weight + } + + pub fn add(&mut self, item: T, weight: u32) { + self.bin_weight += weight; + self.items.push(item); + } + + pub fn into_vec(self) -> Vec { + self.items + } + } + + /// ListPacker help to pack item into bin of item. Each bin has close to + /// target_weight. + pub(crate) struct ListPacker { + target_weight: u32, + _marker: PhantomData, + } + + impl ListPacker { + pub fn new(target_weight: u32) -> Self { + ListPacker { + target_weight, + _marker: PhantomData, + } + } + + pub fn pack(&self, items: I, weight_func: F) -> impl Iterator> + where + I: IntoIterator, + F: Fn(&T) -> u32, + { + let mut bins: Vec> = vec![]; + for item in items { + let cur_weight = weight_func(&item); + let addable_bin = + if let Some(bin) = bins.iter_mut().find(|bin| bin.can_add(cur_weight)) { + bin + } else { + bins.push(Bin::new(self.target_weight)); + bins.last_mut().unwrap() + }; + addable_bin.add(item, cur_weight); + } + + bins.into_iter().map(|bin| bin.into_vec()) + } + } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn test_list_packer_basic_packing() { + let packer = ListPacker::new(10); + let items = vec![3, 4, 5, 6, 2, 1]; + + let packed: Vec> = packer.pack(items, |&x| x).collect(); + + assert_eq!(packed.len(), 3); + assert!(packed[0].iter().sum::() == 10); + assert!(packed[1].iter().sum::() == 5); + assert!(packed[2].iter().sum::() == 6); + } + + #[test] + fn test_list_packer_with_complex_items() { + #[derive(Debug, PartialEq)] + struct Item { + name: String, + size: u32, + } + + let packer = ListPacker::new(15); + let items = vec![ + Item { + name: "A".to_string(), + size: 7, + }, + Item { + name: "B".to_string(), + size: 8, + }, + Item { + name: "C".to_string(), + size: 5, + }, + Item { + name: "D".to_string(), + size: 6, + }, + ]; + + let packed: Vec> = packer.pack(items, |item| item.size).collect(); + + assert_eq!(packed.len(), 2); + assert!(packed[0].iter().map(|x| x.size).sum::() <= 15); + assert!(packed[1].iter().map(|x| x.size).sum::() <= 15); + } + + #[test] + fn test_list_packer_single_large_item() { + let packer = ListPacker::new(10); + let items = vec![15, 5, 3]; + + let packed: Vec> = packer.pack(items, |&x| x).collect(); + + assert_eq!(packed.len(), 2); + assert!(packed[0].contains(&15)); + assert!(packed[1].iter().sum::() <= 10); + } + + #[test] + fn test_list_packer_empty_input() { + let packer = ListPacker::new(10); + let items: Vec = vec![]; + + let packed: Vec> = packer.pack(items, |&x| x).collect(); + + assert_eq!(packed.len(), 0); + } + } +} diff --git a/crates/integration_tests/tests/shared_tests/merge_append_test.rs b/crates/integration_tests/tests/shared_tests/merge_append_test.rs new file mode 100644 index 0000000000..e77b83c5dd --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/merge_append_test.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::{Arc, OnceLock}; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use iceberg::spec::{ + DataFile, ManifestEntry, ManifestStatus, NestedField, PrimitiveType, Schema, Type, +}; +use iceberg::table::Table; +use iceberg::transaction::{ + ApplyTransactionAction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, + MANIFEST_TARGET_SIZE_BYTES, Transaction, +}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::random_ns; + +static FILE_NAME_GENERATOR: OnceLock = OnceLock::new(); + +async fn write_new_data_file(table: &Table) -> Vec { + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = FILE_NAME_GENERATOR.get_or_init(|| { + DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ) + }); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new( + parquet_writer_builder, + None, + table.metadata().default_partition_spec_id(), + ); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"); 100]); + let col2 = Int32Array::from(vec![Some(1); 100]); + let col3 = BooleanArray::from(vec![Some(true); 100]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + data_file_writer.close().await.unwrap() +} + +#[tokio::test] +async fn test_append_data_file() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // fast append data file 3 time to create 3 manifest + for _ in 0..3 { + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap() + } + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + + // Set the merge size to make sure per two manifest will be packed together. + let manifest_file_len = manifest_list.entries().iter().map(|entry| entry.manifest_length).max().unwrap(); + let tx = Transaction::new(&table); + let update_properties_action = tx + .update_table_properties() + .set(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()) + .set(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()) + .set(MANIFEST_TARGET_SIZE_BYTES.to_string(), (manifest_file_len * 2 + 2).to_string()); + let tx = update_properties_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + + // Test case: + // There are 3 manifset, and we append one manifest with merge append. + // Target size is 2 * manifest_file_len + 1, so each two manifest will be packed together. + // The first bin contains the first manifest and the first additional manifest, but it's count is less than min merge count, so it will not be merged. + // Other two manifests will be merged into one manifest. + // Expect three manifest in the new snapshot: + // 1. The new manifest with new added data file. + // 2. Original manifest with one original data file. + // 3. The merged manifest with two original data files. + let mut original_manifest_entries = vec![]; + for (idx, entry) in manifest_list.entries().iter().enumerate() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + assert!(manifest.entries().len() == 1); + if idx == 0 { + original_manifest_entries.push(Arc::new( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(manifest.entries()[0].snapshot_id().unwrap()) + .sequence_number(manifest.entries()[0].sequence_number().unwrap()) + .file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap()) + .data_file(manifest.entries()[0].data_file().clone()) + .build(), + )); + } else { + original_manifest_entries.push(Arc::new( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(manifest.entries()[0].snapshot_id().unwrap()) + .sequence_number(manifest.entries()[0].sequence_number().unwrap()) + .file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap()) + .data_file(manifest.entries()[0].data_file().clone()) + .build(), + )); + } + } + + // append data file with merge append, 4 data file will be merged to two manifest + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let merge_append_action = tx.merge_append().unwrap().add_data_files(data_file.clone()); + let tx = merge_append_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + // Check manifest file + let manifest_list = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 3); + { + let manifest = manifest_list.entries()[1] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert!(manifest.entries().len() == 1); + original_manifest_entries.retain(|entry| !manifest.entries().contains(entry)); + } + { + let manifest = manifest_list.entries()[2] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert!(manifest.entries().len() == 2); + for original_entry in original_manifest_entries.iter() { + assert!(manifest.entries().contains(original_entry)); + } + } +} diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..e1836e8590 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -27,6 +27,7 @@ mod append_data_file_test; mod append_partition_data_file_test; mod conflict_commit_test; mod datafusion; +mod merge_append_test; mod read_evolved_schema; mod read_positional_deletes; mod scan_all_type;