@@ -138,73 +138,75 @@ impl CachingDeleteFileManager {
138138 }
139139 }
140140
141+ /// Load the deletes for all the specified tasks
142+ ///
143+ /// Returned future completes once all loading has finished.
144+ ///
145+ /// * Create a single stream of all delete file tasks irrespective of type,
146+ /// so that we can respect the combined concurrency limit
147+ /// * We then process each in two phases: load and parse.
148+ /// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
149+ /// stream the file contents out
150+ /// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
151+ /// another concurrently processing data file scan task. If it is, we return a future
152+ /// for the pre-existing task from the load phase. If not, we create such a future
153+ /// and store it in the state to prevent other data file tasks from starting to load
154+ /// the same equality delete file, and return a record batch stream from the load phase
155+ /// as per the other delete file types - only this time it is accompanied by a one-shot
156+ /// channel sender that we will eventually use to resolve the shared future that we stored
157+ /// in the state.
158+ /// * When this gets updated to add support for delete vectors, the load phase will return
159+ /// a PuffinReader for them.
160+ /// * The parse phase parses each record batch stream according to its associated data type.
161+ /// The result of this is a map of data file paths to delete vectors for the positional
162+ /// delete tasks (and in future for the delete vector tasks). For equality delete
163+ /// file tasks, this results in an unbound Predicate.
164+ /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
165+ /// channel to store them in the right place in the delete file managers state.
166+ /// * The results of all of these futures are awaited on in parallel with the specified
167+ /// level of concurrency and collected into a vec. We then combine all of the delete
168+ /// vector maps that resulted from any positional delete or delete vector files into a
169+ /// single map and persist it in the state.
170+ ///
171+ ///
172+ /// Conceptually, the data flow is like this:
173+ /// ```none
174+ /// FileScanTaskDeleteFile
175+ /// |
176+ /// Already-loading EQ Delete | Everything Else
177+ /// +---------------------------------------------------+
178+ /// | |
179+ /// [get existing future] [load recordbatch stream / puffin]
180+ /// DeleteFileContext::InProgEqDel DeleteFileContext
181+ /// | |
182+ /// | |
183+ /// | +-----------------------------+--------------------------+
184+ /// | Pos Del Del Vec (Not yet Implemented) EQ Del
185+ /// | | | |
186+ /// | [parse pos del stream] [parse del vec puffin] [parse eq del]
187+ /// | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
188+ /// | | | |
189+ /// | | | [persist to state]
190+ /// | | | ()
191+ /// | | | |
192+ /// | +-----------------------------+--------------------------+
193+ /// | |
194+ /// | [buffer unordered]
195+ /// | |
196+ /// | [combine del vectors]
197+ /// | HashMap<String, RoaringTreeMap>
198+ /// | |
199+ /// | [persist del vectors to state]
200+ /// | ()
201+ /// | |
202+ /// +-------------------------+-------------------------+
203+ /// |
204+ /// [join!]
205+ /// ```
141206 pub ( crate ) async fn load_deletes (
142207 & self ,
143208 delete_file_entries : & [ FileScanTaskDeleteFile ] ,
144209 ) -> Result < ( ) > {
145- /*
146- * Create a single stream of all delete file tasks irrespective of type,
147- so that we can respect the combined concurrency limit
148- * We then process each in two phases: load and parse.
149- * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
150- stream the file contents out
151- * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
152- another concurrently processing data file scan task. If it is, we return a future
153- for the pre-existing task from the load phase. If not, we create such a future
154- and store it in the state to prevent other data file tasks from starting to load
155- the same equality delete file, and return a record batch stream from the load phase
156- as per the other delete file types - only this time it is accompanied by a one-shot
157- channel sender that we will eventually use to resolve the shared future that we stored
158- in the state.
159- * When this gets updated to add support for delete vectors, the load phase will return
160- a PuffinReader for them.
161- * The parse phase parses each record batch stream according to its associated data type.
162- The result of this is a map of data file paths to delete vectors for the positional
163- delete tasks (and in future for the delete vector tasks). For equality delete
164- file tasks, this results in an unbound Predicate.
165- * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
166- channel to store them in the right place in the delete file manager's state.
167- * The results of all of these futures are awaited on in parallel with the specified
168- level of concurrency and collected into a vec. We then combine all of the delete
169- vector maps that resulted from any positional delete or delete vector files into a
170- single map and persist it in the state.
171-
172-
173- Conceptually, the data flow is like this:
174-
175- FileScanTaskDeleteFile
176- |
177- Already-loading EQ Delete | Everything Else
178- +---------------------------------------------------+
179- | |
180- [get existing future] [load recordbatch stream / puffin]
181- DeleteFileContext::InProgEqDel DeleteFileContext
182- | |
183- | |
184- | +-----------------------------+--------------------------+
185- | Pos Del Del Vec (Not yet Implemented) EQ Del
186- | | | |
187- | [parse pos del stream] [parse del vec puffin] [parse eq del]
188- | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
189- | | | |
190- | | | [persist to state]
191- | | | ()
192- | | | |
193- | +-----------------------------+--------------------------+
194- | |
195- | [buffer unordered]
196- | |
197- | [combine del vectors]
198- | HashMap<String, RoaringTreeMap>
199- | |
200- | [persist del vectors to state]
201- | ()
202- | |
203- +-------------------------+-------------------------+
204- |
205- [join!]
206- */
207-
208210 let stream_items = delete_file_entries
209211 . iter ( )
210212 . map ( |t| ( t. clone ( ) , self . file_io . clone ( ) , self . state . clone ( ) ) )
@@ -253,18 +255,20 @@ impl CachingDeleteFileManager {
253255 ) ) ,
254256
255257 DataContentType :: EqualityDeletes => {
256- let ( sender, fut) = EqDelFuture :: new ( ) ;
257- {
258+ let sender = {
258259 let mut state = state. write ( ) . unwrap ( ) ;
259-
260260 if let Some ( existing) = state. equality_deletes . get ( & task. file_path ) {
261261 return Ok ( DeleteFileContext :: InProgEqDel ( existing. clone ( ) ) ) ;
262262 }
263263
264+ let ( sender, fut) = EqDelFuture :: new ( ) ;
265+
264266 state
265267 . equality_deletes
266268 . insert ( task. file_path . to_string ( ) , fut) ;
267- }
269+
270+ sender
271+ } ;
268272
269273 Ok ( DeleteFileContext :: FreshEqDel {
270274 batch_stream : Self :: parquet_to_batch_stream ( & task. file_path , file_io) . await ?,
0 commit comments