@@ -24,18 +24,24 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
2424use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
2525use crate :: io:: FileIO ;
2626use crate :: spec:: {
27- DataContentType , ManifestContentType , ManifestEntryRef , ManifestFile , Schema , SchemaRef ,
28- SnapshotRef , TableMetadataRef ,
27+ DataContentType , ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList , Schema ,
28+ SchemaRef , SnapshotRef , TableMetadataRef ,
2929} ;
3030use crate :: table:: Table ;
3131use crate :: { Error , ErrorKind , Result } ;
3232use arrow_array:: RecordBatch ;
33- use async_stream :: try_stream ;
33+ use futures :: channel :: mpsc :: { channel , Sender } ;
3434use futures:: stream:: BoxStream ;
35- use futures:: StreamExt ;
35+ use futures:: { SinkExt , StreamExt , TryStreamExt } ;
3636use std:: collections:: hash_map:: Entry ;
3737use std:: collections:: HashMap ;
3838use std:: sync:: Arc ;
39+ use tokio:: spawn;
40+
41+ const CHANNEL_BUFFER_SIZE : usize = 10 ;
42+ const CONCURRENCY_LIMIT_MANIFEST_FILES : usize = 10 ;
43+ const CONCURRENCY_LIMIT_MANIFEST_ENTRIES : usize = 10 ;
44+ const SUPPORTED_MANIFEST_FILE_CONTENT_TYPES : [ ManifestContentType ; 1 ] = [ ManifestContentType :: Data ] ;
3945
4046/// A stream of [`FileScanTask`].
4147pub type FileScanTaskStream = BoxStream < ' static , Result < FileScanTask > > ;
@@ -206,76 +212,20 @@ impl TableScan {
206212 self . case_sensitive ,
207213 ) ?;
208214
209- let mut partition_filter_cache = PartitionFilterCache :: new ( ) ;
210- let mut manifest_evaluator_cache = ManifestEvaluatorCache :: new ( ) ;
211-
212- Ok ( try_stream ! {
213- let manifest_list = context
214- . snapshot
215- . load_manifest_list( & context. file_io, & context. table_metadata)
216- . await ?;
217-
218- for entry in manifest_list. entries( ) {
219- if !Self :: content_type_is_data( entry) {
220- continue ;
221- }
215+ let ( sender, receiver) = channel ( CHANNEL_BUFFER_SIZE ) ;
222216
223- let partition_spec_id = entry. partition_spec_id;
217+ let manifest_list = context
218+ . snapshot
219+ . load_manifest_list ( & context. file_io , & context. table_metadata )
220+ . await ?;
224221
225- let partition_filter = partition_filter_cache. get(
226- partition_spec_id,
227- & context,
228- ) ?;
222+ spawn ( async move {
223+ let _ = ConcurrentFileScanStreamContext :: new ( context, sender)
224+ . run ( manifest_list)
225+ . await ;
226+ } ) ;
229227
230- if let Some ( partition_filter) = partition_filter {
231- let manifest_evaluator = manifest_evaluator_cache. get(
232- partition_spec_id,
233- partition_filter,
234- ) ;
235-
236- if !manifest_evaluator. eval( entry) ? {
237- continue ;
238- }
239- }
240-
241- let manifest = entry. load_manifest( & context. file_io) . await ?;
242- let mut manifest_entries_stream =
243- futures:: stream:: iter( manifest. entries( ) . iter( ) . filter( |e| e. is_alive( ) ) ) ;
244-
245- while let Some ( manifest_entry) = manifest_entries_stream. next( ) . await {
246- // TODO: Apply ExpressionEvaluator
247-
248- if let Some ( bound_predicate) = context. bound_filter( ) {
249- // reject any manifest entries whose data file's metrics don't match the filter.
250- if !InclusiveMetricsEvaluator :: eval(
251- bound_predicate,
252- manifest_entry. data_file( ) ,
253- false
254- ) ? {
255- continue ;
256- }
257- }
258-
259- match manifest_entry. content_type( ) {
260- DataContentType :: EqualityDeletes | DataContentType :: PositionDeletes => {
261- yield Err ( Error :: new(
262- ErrorKind :: FeatureUnsupported ,
263- "Delete files are not supported yet." ,
264- ) ) ?;
265- }
266- DataContentType :: Data => {
267- let scan_task: Result <FileScanTask > = Ok ( FileScanTask {
268- data_manifest_entry: manifest_entry. clone( ) ,
269- start: 0 ,
270- length: manifest_entry. file_size_in_bytes( ) ,
271- } ) ;
272- yield scan_task?;
273- }
274- }
275- }
276- }
277- }
278- . boxed ( ) )
228+ return Ok ( receiver. boxed ( ) ) ;
279229 }
280230
281231 /// Returns an [`ArrowRecordBatchStream`].
@@ -333,13 +283,172 @@ impl TableScan {
333283
334284 arrow_reader_builder. build ( ) . read ( self . plan_files ( ) . await ?)
335285 }
286+ }
287+
288+ #[ derive( Debug ) ]
289+ struct ConcurrentFileScanStreamContext {
290+ context : Arc < FileScanStreamContext > ,
291+ sender : Sender < Result < FileScanTask > > ,
292+ manifest_evaluator_cache : ManifestEvaluatorCache ,
293+ partition_filter_cache : PartitionFilterCache ,
294+ }
295+
296+ impl ConcurrentFileScanStreamContext {
297+ fn new ( context : FileScanStreamContext , sender : Sender < Result < FileScanTask > > ) -> Self {
298+ ConcurrentFileScanStreamContext {
299+ context : Arc :: new ( context) ,
300+ sender,
301+ manifest_evaluator_cache : ManifestEvaluatorCache :: new ( ) ,
302+ partition_filter_cache : PartitionFilterCache :: new ( ) ,
303+ }
304+ }
305+
306+ async fn run ( & mut self , manifest_list : ManifestList ) -> Result < ( ) > {
307+ let file_io = self . context . file_io . clone ( ) ;
308+ let sender = self . sender . clone ( ) ;
309+ let context = self . context . clone ( ) ;
310+
311+ // This whole Vec-and-for-loop approach feels sub-optimally structured.
312+ // I've tried structuring this in multiple ways but run into
313+ // issues with ownership. Ideally I'd like to structure this
314+ // with a functional programming approach: extracting
315+ // sections 1, 2, and 3 out into different methods on Self,
316+ // and then use some iterator combinators to orchestrate it all.
317+ // Section 1 is pretty trivially refactorable into a static method
318+ // that can be used in a closure that can be used with Iterator::filter.
319+ // Similarly, section 3 seems easily factor-able into a method that can
320+ // be passed into Iterator::map.
321+ // Section 2 turns out trickier - we want to exit the entire `run` method early
322+ // if the eval fails, and filter out any manifest_files from the iterator / stream
323+ // if the eval succeeds but returns true. We bump into ownership issues due
324+ // to needing to pass mut self as the caches need to be able to mutate.
325+ // Section 3 runs into ownership issues when trying to refactor its closure to be
326+ // a static or non-static method.
327+
328+ // 1
329+ let filtered_manifest_files = manifest_list
330+ . entries ( )
331+ . iter ( )
332+ . filter ( Self :: reject_unsupported_content_types) ;
333+
334+ // 2
335+ let mut filtered_manifest_files2 = vec ! [ ] ;
336+ for manifest_file in filtered_manifest_files {
337+ if !self . apply_evaluator ( manifest_file) ? {
338+ continue ;
339+ }
340+
341+ filtered_manifest_files2. push ( manifest_file) ;
342+ }
343+
344+ // 3
345+ let filtered_manifest_files = filtered_manifest_files2. into_iter ( ) . map ( |manifest_file| {
346+ Ok ( (
347+ manifest_file,
348+ file_io. clone ( ) ,
349+ sender. clone ( ) ,
350+ context. clone ( ) ,
351+ ) )
352+ } ) ;
353+
354+ futures:: stream:: iter ( filtered_manifest_files)
355+ . try_for_each_concurrent (
356+ CONCURRENCY_LIMIT_MANIFEST_FILES ,
357+ Self :: process_manifest_file,
358+ )
359+ . await
360+ }
361+
362+ fn reject_unsupported_content_types ( manifest_file : & & ManifestFile ) -> bool {
363+ SUPPORTED_MANIFEST_FILE_CONTENT_TYPES . contains ( & manifest_file. content )
364+ }
365+
366+ fn apply_evaluator ( & mut self , manifest_file : & ManifestFile ) -> Result < bool > {
367+ let partition_spec_id = manifest_file. partition_spec_id ;
368+
369+ let partition_filter = self
370+ . partition_filter_cache
371+ . get ( partition_spec_id, & self . context ) ?;
372+
373+ if let Some ( partition_filter) = partition_filter {
374+ let manifest_evaluator = self
375+ . manifest_evaluator_cache
376+ . get ( partition_spec_id, partition_filter) ;
377+
378+ if !manifest_evaluator. eval ( manifest_file) ? {
379+ return Ok ( false ) ;
380+ }
381+ }
382+
383+ Ok ( true )
384+ }
385+
386+ async fn process_manifest_file (
387+ manifest_and_file_io_and_sender : (
388+ & ManifestFile ,
389+ FileIO ,
390+ Sender < Result < FileScanTask > > ,
391+ Arc < FileScanStreamContext > ,
392+ ) ,
393+ ) -> Result < ( ) > {
394+ let ( manifest_file, file_io, sender, context) = manifest_and_file_io_and_sender;
395+
396+ let manifest = manifest_file. load_manifest ( & file_io) . await ?;
397+
398+ let manifest_entries = manifest
399+ . entries ( )
400+ . iter ( )
401+ . filter ( |x| x. is_alive ( ) )
402+ . map ( |manifest_entry| Ok ( ( manifest_entry, sender. clone ( ) , context. clone ( ) ) ) ) ;
403+
404+ futures:: stream:: iter ( manifest_entries)
405+ . try_for_each_concurrent (
406+ CONCURRENCY_LIMIT_MANIFEST_ENTRIES ,
407+ Self :: process_manifest_entry,
408+ )
409+ . await
410+ }
411+
412+ async fn process_manifest_entry (
413+ manifest_entry_and_sender : (
414+ & ManifestEntryRef ,
415+ Sender < Result < FileScanTask > > ,
416+ Arc < FileScanStreamContext > ,
417+ ) ,
418+ ) -> Result < ( ) > {
419+ let ( manifest_entry, mut sender, context) = manifest_entry_and_sender;
420+
421+ if !matches ! ( manifest_entry. content_type( ) , DataContentType :: Data ) {
422+ return Err ( Error :: new (
423+ ErrorKind :: FeatureUnsupported ,
424+ format ! (
425+ "Files of type '{:?}' are not supported yet." ,
426+ manifest_entry. content_type( )
427+ ) ,
428+ ) ) ;
429+ }
430+
431+ if let Some ( bound_predicate) = context. bound_filter ( ) {
432+ // reject any manifest entries whose data file's metrics don't match the filter.
433+ if !InclusiveMetricsEvaluator :: eval ( bound_predicate, manifest_entry. data_file ( ) , false ) ?
434+ {
435+ return Ok ( ( ) ) ;
436+ }
437+ }
438+
439+ // TODO: Apply ExpressionEvaluator
440+
441+ Ok ( sender
442+ . send ( Ok ( Self :: manifest_entry_to_file_scan_task ( manifest_entry) ) )
443+ . await ?)
444+ }
336445
337- /// Checks whether the [`ManifestContentType`] is `Data` or not.
338- fn content_type_is_data ( entry : & ManifestFile ) -> bool {
339- if let ManifestContentType :: Data = entry. content {
340- return true ;
446+ fn manifest_entry_to_file_scan_task ( manifest_entry : & ManifestEntryRef ) -> FileScanTask {
447+ FileScanTask {
448+ data_manifest_entry : manifest_entry. clone ( ) ,
449+ start : 0 ,
450+ length : manifest_entry. file_size_in_bytes ( ) ,
341451 }
342- false
343452 }
344453}
345454
0 commit comments