@@ -24,12 +24,22 @@ use uuid::Uuid;
2424use crate :: error:: Result ;
2525use crate :: spec:: { DataFile , ManifestEntry , ManifestFile , Operation } ;
2626use crate :: transaction:: snapshot:: {
27- DefaultManifestProcess , SnapshotProduceAction , SnapshotProduceOperation ,
27+ DefaultManifestProcess , MergeManifestProcess , SnapshotProduceAction , SnapshotProduceOperation ,
2828} ;
2929use crate :: transaction:: Transaction ;
3030use crate :: writer:: file_writer:: ParquetWriter ;
3131use crate :: { Error , ErrorKind } ;
3232
33+ /// Target size of manifest file when merging manifests.
34+ pub const MANIFEST_TARGET_SIZE_BYTES : & str = "commit.manifest.target-size-bytes" ;
35+ const MANIFEST_TARGET_SIZE_BYTES_DEFAULT : u32 = 8 * 1024 * 1024 ; // 8 MB
36+ /// Minimum number of manifests to merge.
37+ pub const MANIFEST_MIN_MERGE_COUNT : & str = "commit.manifest.min-count-to-merge" ;
38+ const MANIFEST_MIN_MERGE_COUNT_DEFAULT : u32 = 100 ;
39+ /// Whether allow to merge manifests.
40+ pub const MANIFEST_MERGE_ENABLED : & str = "commit.manifest-merge.enabled" ;
41+ const MANIFEST_MERGE_ENABLED_DEFAULT : bool = false ;
42+
3343/// FastAppendAction is a transaction action for fast append data files to the table.
3444pub struct FastAppendAction < ' a > {
3545 snapshot_produce_action : SnapshotProduceAction < ' a > ,
@@ -204,6 +214,84 @@ impl SnapshotProduceOperation for FastAppendOperation {
204214 }
205215}
206216
217+ /// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
218+ /// based on the target size.
219+ pub struct MergeAppendAction < ' a > {
220+ snapshot_produce_action : SnapshotProduceAction < ' a > ,
221+ target_size_bytes : u32 ,
222+ min_count_to_merge : u32 ,
223+ merge_enabled : bool ,
224+ }
225+
226+ impl < ' a > MergeAppendAction < ' a > {
227+ #[ allow( clippy:: too_many_arguments) ]
228+ pub ( crate ) fn new (
229+ tx : Transaction < ' a > ,
230+ snapshot_id : i64 ,
231+ commit_uuid : Uuid ,
232+ key_metadata : Vec < u8 > ,
233+ snapshot_properties : HashMap < String , String > ,
234+ ) -> Result < Self > {
235+ let target_size_bytes: u32 = tx
236+ . current_table
237+ . metadata ( )
238+ . properties ( )
239+ . get ( MANIFEST_TARGET_SIZE_BYTES )
240+ . and_then ( |s| s. parse ( ) . ok ( ) )
241+ . unwrap_or ( MANIFEST_TARGET_SIZE_BYTES_DEFAULT ) ;
242+ let min_count_to_merge: u32 = tx
243+ . current_table
244+ . metadata ( )
245+ . properties ( )
246+ . get ( MANIFEST_MIN_MERGE_COUNT )
247+ . and_then ( |s| s. parse ( ) . ok ( ) )
248+ . unwrap_or ( MANIFEST_MIN_MERGE_COUNT_DEFAULT ) ;
249+ let merge_enabled = tx
250+ . current_table
251+ . metadata ( )
252+ . properties ( )
253+ . get ( MANIFEST_MERGE_ENABLED )
254+ . and_then ( |s| s. parse ( ) . ok ( ) )
255+ . unwrap_or ( MANIFEST_MERGE_ENABLED_DEFAULT ) ;
256+ Ok ( Self {
257+ snapshot_produce_action : SnapshotProduceAction :: new (
258+ tx,
259+ snapshot_id,
260+ key_metadata,
261+ commit_uuid,
262+ snapshot_properties,
263+ ) ?,
264+ target_size_bytes,
265+ min_count_to_merge,
266+ merge_enabled,
267+ } )
268+ }
269+
270+ /// Add data files to the snapshot.
271+ pub fn add_data_files (
272+ & mut self ,
273+ data_files : impl IntoIterator < Item = DataFile > ,
274+ ) -> Result < & mut Self > {
275+ self . snapshot_produce_action . add_data_files ( data_files) ?;
276+ Ok ( self )
277+ }
278+
279+ /// Finished building the action and apply it to the transaction.
280+ pub async fn apply ( self ) -> Result < Transaction < ' a > > {
281+ if self . merge_enabled {
282+ let process =
283+ MergeManifestProcess :: new ( self . target_size_bytes , self . min_count_to_merge ) ;
284+ self . snapshot_produce_action
285+ . apply ( FastAppendOperation , process)
286+ . await
287+ } else {
288+ self . snapshot_produce_action
289+ . apply ( FastAppendOperation , DefaultManifestProcess )
290+ . await
291+ }
292+ }
293+ }
294+
207295#[ cfg( test) ]
208296mod tests {
209297 use crate :: scan:: tests:: TableTestFixture ;
0 commit comments