@@ -168,6 +168,7 @@ impl<'a> SnapshotProduceAction<'a> {
168168 fn new_manifest_writer (
169169 & mut self ,
170170 content_type : & ManifestContentType ,
171+ partition_spec_id : i32 ,
171172 ) -> Result < ManifestWriter > {
172173 let new_manifest_path = format ! (
173174 "{}/{}/{}-m{}.{}" ,
@@ -182,17 +183,26 @@ impl<'a> SnapshotProduceAction<'a> {
182183 . current_table
183184 . file_io ( )
184185 . new_output ( new_manifest_path) ?;
186+ let partition_spec = self
187+ . tx
188+ . current_table
189+ . metadata ( )
190+ . partition_spec_by_id ( partition_spec_id)
191+ . ok_or_else ( || {
192+ Error :: new (
193+ ErrorKind :: DataInvalid ,
194+ "Invalid partition spec id for new manifest writer" ,
195+ )
196+ . with_context ( "partition spec id" , partition_spec_id. to_string ( ) )
197+ } ) ?
198+ . as_ref ( )
199+ . clone ( ) ;
185200 let builder = ManifestWriterBuilder :: new (
186201 output,
187202 Some ( self . snapshot_id ) ,
188203 self . key_metadata . clone ( ) ,
189204 self . tx . current_table . metadata ( ) . current_schema ( ) . clone ( ) ,
190- self . tx
191- . current_table
192- . metadata ( )
193- . default_partition_spec ( )
194- . as_ref ( )
195- . clone ( ) ,
205+ partition_spec,
196206 ) ;
197207 if self . tx . current_table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
198208 Ok ( builder. build_v1 ( ) )
@@ -244,29 +254,95 @@ impl<'a> SnapshotProduceAction<'a> {
244254 builder. build ( )
245255 }
246256 } ) ;
247- let mut writer = self . new_manifest_writer ( & content_type) ?;
257+ let mut writer = self . new_manifest_writer (
258+ & content_type,
259+ self . tx . current_table . metadata ( ) . default_partition_spec_id ( ) ,
260+ ) ?;
248261 for entry in manifest_entries {
249262 writer. add_entry ( entry) ?;
250263 }
251264 writer. write_manifest_file ( ) . await
252265 }
253266
267+ async fn write_delete_manifest (
268+ & mut self ,
269+ deleted_entries : Vec < ManifestEntry > ,
270+ ) -> Result < Vec < ManifestFile > > {
271+ if deleted_entries. is_empty ( ) {
272+ return Ok ( vec ! [ ] ) ;
273+ }
274+
275+ // Group deleted entries by spec_id
276+ let mut partition_groups = HashMap :: new ( ) ;
277+ for entry in deleted_entries {
278+ partition_groups
279+ . entry ( entry. data_file ( ) . partition_spec_id )
280+ . or_insert_with ( Vec :: new)
281+ . push ( entry) ;
282+ }
283+
284+ // Write a delete manifest per spec_id group
285+ let mut deleted_manifests = Vec :: new ( ) ;
286+ for ( spec_id, entries) in partition_groups {
287+ let mut data_file_writer: Option < ManifestWriter > = None ;
288+ let mut delete_file_writer: Option < ManifestWriter > = None ;
289+ for entry in entries {
290+ match entry. content_type ( ) {
291+ DataContentType :: Data => {
292+ if data_file_writer. is_none ( ) {
293+ data_file_writer = Some (
294+ self . new_manifest_writer ( & ManifestContentType :: Data , spec_id) ?,
295+ ) ;
296+ }
297+ data_file_writer. as_mut ( ) . unwrap ( ) . add_delete_entry ( entry) ?;
298+ }
299+ DataContentType :: EqualityDeletes | DataContentType :: PositionDeletes => {
300+ if delete_file_writer. is_none ( ) {
301+ delete_file_writer = Some (
302+ self . new_manifest_writer ( & ManifestContentType :: Deletes , spec_id) ?,
303+ ) ;
304+ }
305+ delete_file_writer
306+ . as_mut ( )
307+ . unwrap ( )
308+ . add_delete_entry ( entry) ?;
309+ }
310+ }
311+ }
312+ if let Some ( writer) = data_file_writer {
313+ deleted_manifests. push ( writer. write_manifest_file ( ) . await ?) ;
314+ }
315+ if let Some ( writer) = delete_file_writer {
316+ deleted_manifests. push ( writer. write_manifest_file ( ) . await ?) ;
317+ }
318+ }
319+
320+ Ok ( deleted_manifests)
321+ }
322+
254323 async fn manifest_file < OP : SnapshotProduceOperation , MP : ManifestProcess > (
255324 & mut self ,
256325 snapshot_produce_operation : & OP ,
257326 manifest_process : & MP ,
258327 ) -> Result < Vec < ManifestFile > > {
259328 let mut manifest_files = vec ! [ ] ;
260329 let data_files = std:: mem:: take ( & mut self . added_data_files ) ;
261- let delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
330+ let added_delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
262331 if !data_files. is_empty ( ) {
263332 let added_manifest = self . write_added_manifest ( data_files) . await ?;
264333 manifest_files. push ( added_manifest) ;
265334 }
266- if !delete_files. is_empty ( ) {
267- let added_delete_manifest = self . write_added_manifest ( delete_files) . await ?;
335+
336+ if !added_delete_files. is_empty ( ) {
337+ let added_delete_manifest = self . write_added_manifest ( added_delete_files) . await ?;
268338 manifest_files. push ( added_delete_manifest) ;
269339 }
340+
341+ let delete_manifests = self
342+ . write_delete_manifest ( snapshot_produce_operation. delete_entries ( self ) . await ?)
343+ . await ?;
344+ manifest_files. extend ( delete_manifests) ;
345+
270346 let existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
271347
272348 manifest_files. extend ( existing_manifests) ;
@@ -505,7 +581,7 @@ impl MergeManifestManager {
505581 Box < dyn Future < Output = Result < Vec < ManifestFile > > > + Send > ,
506582 > )
507583 } else {
508- let writer = snapshot_produce. new_manifest_writer ( & self . content ) ?;
584+ let writer = snapshot_produce. new_manifest_writer ( & self . content , snapshot_produce . tx . current_table . metadata ( ) . default_partition_spec_id ( ) ) ?;
509585 let snapshot_id = snapshot_produce. snapshot_id ;
510586 let file_io = snapshot_produce. tx . current_table . file_io ( ) . clone ( ) ;
511587 Ok ( ( Box :: pin ( async move {
0 commit comments