1717
1818//! The table implementation.
1919
20- use std:: collections:: HashMap ;
21- use std:: { any:: Any , str:: FromStr , sync:: Arc } ;
22-
2320use super :: helpers:: { expr_applicable_for_cols, pruned_partition_list} ;
2421use super :: { ListingTableUrl , PartitionedFile } ;
22+ use std:: collections:: HashMap ;
23+ use std:: { any:: Any , str:: FromStr , sync:: Arc } ;
2524
2625use crate :: datasource:: {
2726 create_ordering,
2827 file_format:: {
2928 file_compression_type:: FileCompressionType , FileFormat , FilePushdownSupport ,
3029 } ,
31- get_statistics_with_limit,
3230 physical_plan:: FileSinkConfig ,
3331} ;
3432use crate :: execution:: context:: SessionState ;
@@ -55,9 +53,12 @@ use datafusion_physical_expr::{
5553
5654use async_trait:: async_trait;
5755use datafusion_catalog:: Session ;
56+ use datafusion_common:: stats:: Precision ;
57+ use datafusion_datasource:: add_row_stats;
58+ use datafusion_datasource:: compute_all_files_statistics;
5859use datafusion_datasource:: file_groups:: FileGroup ;
5960use datafusion_physical_expr_common:: sort_expr:: LexRequirement ;
60- use futures:: { future, stream, StreamExt , TryStreamExt } ;
61+ use futures:: { future, stream, Stream , StreamExt , TryStreamExt } ;
6162use itertools:: Itertools ;
6263use object_store:: ObjectStore ;
6364
@@ -1115,32 +1116,26 @@ impl ListingTable {
11151116 let files = file_list
11161117 . map ( |part_file| async {
11171118 let part_file = part_file?;
1118- if self . options . collect_stat {
1119- let statistics =
1120- self . do_collect_statistics ( ctx, & store, & part_file) . await ?;
1121- Ok ( ( part_file, statistics) )
1119+ let statistics = if self . options . collect_stat {
1120+ self . do_collect_statistics ( ctx, & store, & part_file) . await ?
11221121 } else {
1123- Ok ( (
1124- part_file,
1125- Arc :: new ( Statistics :: new_unknown ( & self . file_schema ) ) ,
1126- ) )
1127- }
1122+ Arc :: new ( Statistics :: new_unknown ( & self . file_schema ) )
1123+ } ;
1124+ Ok ( part_file. with_statistics ( statistics) )
11281125 } )
11291126 . boxed ( )
11301127 . buffer_unordered ( ctx. config_options ( ) . execution . meta_fetch_concurrency ) ;
11311128
1132- let ( files, statistics) = get_statistics_with_limit (
1133- files,
1129+ let ( file_group, inexact_stats) =
1130+ get_files_with_limit ( files, limit, self . options . collect_stat ) . await ?;
1131+
1132+ let file_groups = file_group. split_files ( self . options . target_partitions ) ;
1133+ compute_all_files_statistics (
1134+ file_groups,
11341135 self . schema ( ) ,
1135- limit,
11361136 self . options . collect_stat ,
1137+ inexact_stats,
11371138 )
1138- . await ?;
1139-
1140- Ok ( (
1141- files. split_files ( self . options . target_partitions ) ,
1142- statistics,
1143- ) )
11441139 }
11451140
11461141 /// Collects statistics for a given partitioned file.
@@ -1182,6 +1177,86 @@ impl ListingTable {
11821177 }
11831178}
11841179
1180+ /// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
1181+ ///
1182+ /// This function collects files from the provided stream until either:
1183+ /// 1. The stream is exhausted
1184+ /// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
1185+ ///
1186+ /// # Arguments
1187+ /// * `files` - A stream of `Result<PartitionedFile>` items to process
1188+ /// * `limit` - An optional row count limit. If provided, the function will stop collecting files
1189+ /// once the accumulated number of rows exceeds this limit
1190+ /// * `collect_stats` - Whether to collect and accumulate statistics from the files
1191+ ///
1192+ /// # Returns
1193+ /// A `Result` containing a `FileGroup` with the collected files
1194+ /// and a boolean indicating whether the statistics are inexact.
1195+ ///
1196+ /// # Note
1197+ /// The function will continue processing files if statistics are not available or if the
1198+ /// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
1199+ /// but files will still be collected.
1200+ async fn get_files_with_limit (
1201+ files : impl Stream < Item = Result < PartitionedFile > > ,
1202+ limit : Option < usize > ,
1203+ collect_stats : bool ,
1204+ ) -> Result < ( FileGroup , bool ) > {
1205+ let mut file_group = FileGroup :: default ( ) ;
1206+ // Fusing the stream allows us to call next safely even once it is finished.
1207+ let mut all_files = Box :: pin ( files. fuse ( ) ) ;
1208+ let mut num_rows = Precision :: < usize > :: Absent ;
1209+ while let Some ( first_file) = all_files. next ( ) . await {
1210+ let file = first_file?;
1211+ if let Some ( file_statistic) = & file. statistics {
1212+ num_rows = file_statistic. num_rows ;
1213+ }
1214+ file_group. push ( file) ;
1215+
1216+ // If the number of rows exceeds the limit, we can stop processing
1217+ // files. This only applies when we know the number of rows. It also
1218+ // currently ignores tables that have no statistics regarding the
1219+ // number of rows.
1220+ let conservative_num_rows = match num_rows {
1221+ Precision :: Exact ( nr) => nr,
1222+ _ => usize:: MIN ,
1223+ } ;
1224+ if conservative_num_rows <= limit. unwrap_or ( usize:: MAX ) {
1225+ while let Some ( current) = all_files. next ( ) . await {
1226+ let file = current?;
1227+ if !collect_stats {
1228+ file_group. push ( file) ;
1229+ continue ;
1230+ }
1231+
1232+ // We accumulate the number of rows, total byte size and null
1233+ // counts across all the files in question. If any file does not
1234+ // provide any information or provides an inexact value, we demote
1235+ // the statistic precision to inexact.
1236+ if let Some ( file_stats) = & file. statistics {
1237+ num_rows = add_row_stats ( num_rows, file_stats. num_rows ) ;
1238+ }
1239+ file_group. push ( file) ;
1240+
1241+ // If the number of rows exceeds the limit, we can stop processing
1242+ // files. This only applies when we know the number of rows. It also
1243+ // currently ignores tables that have no statistics regarding the
1244+ // number of rows.
1245+ if num_rows. get_value ( ) . unwrap_or ( & usize:: MIN )
1246+ > & limit. unwrap_or ( usize:: MAX )
1247+ {
1248+ break ;
1249+ }
1250+ }
1251+ }
1252+ }
1253+ // If we still have files in the stream, it means that the limit kicked
1254+ // in, and the statistic could have been different had we processed the
1255+ // files in a different order.
1256+ let inexact_stats = all_files. next ( ) . await . is_some ( ) ;
1257+ Ok ( ( file_group, inexact_stats) )
1258+ }
1259+
11851260#[ cfg( test) ]
11861261mod tests {
11871262 use super :: * ;
0 commit comments