From c902a6bfa3d655714ac6ad93ce3feb1e453c769f Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 26 Mar 2025 16:52:15 +0800 Subject: [PATCH 1/4] Support computing statistics for FileGroup --- .../core/src/datasource/listing/table.rs | 126 ++++++++++++++---- datafusion/datasource/src/file_groups.rs | 5 + datafusion/datasource/src/file_scan_config.rs | 4 +- datafusion/datasource/src/mod.rs | 8 +- .../proto/src/physical_plan/from_proto.rs | 6 +- .../proto/src/physical_plan/to_proto.rs | 2 +- 6 files changed, 123 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 61eeb419a480..b8eb5eb1ff5a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,18 +17,16 @@ //! The table implementation. -use std::collections::HashMap; -use std::{any::Any, str::FromStr, sync::Arc}; - use super::helpers::{expr_applicable_for_cols, pruned_partition_list}; use super::{ListingTableUrl, PartitionedFile}; +use std::collections::HashMap; +use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ create_ordering, file_format::{ file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport, }, - get_statistics_with_limit, physical_plan::FileSinkConfig, }; use crate::execution::context::SessionState; @@ -53,11 +51,13 @@ use datafusion_physical_expr::{ create_physical_expr, LexOrdering, PhysicalSortRequirement, }; +use crate::datasource::statistics::compute_all_files_statistics; use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_common::stats::Precision; use datafusion_datasource::file_groups::FileGroup; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use futures::{future, stream, StreamExt, TryStreamExt}; +use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; @@ -1115,32 +1115,26 @@ impl ListingTable { let files = file_list .map(|part_file| async { let part_file = part_file?; - if self.options.collect_stat { - let statistics = - self.do_collect_statistics(ctx, &store, &part_file).await?; - Ok((part_file, statistics)) + let statistics = if self.options.collect_stat { + self.do_collect_statistics(ctx, &store, &part_file).await? } else { - Ok(( - part_file, - Arc::new(Statistics::new_unknown(&self.file_schema)), - )) - } + Arc::new(Statistics::new_unknown(&self.file_schema)) + }; + Ok(part_file.with_statistics(statistics)) }) .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - let (files, statistics) = get_statistics_with_limit( - files, + let (file_group, inexact_stats) = + get_files_with_limit(files, limit, self.options.collect_stat).await?; + + let file_groups = file_group.split_files(self.options.target_partitions); + compute_all_files_statistics( + file_groups, self.schema(), - limit, self.options.collect_stat, + inexact_stats, ) - .await?; - - Ok(( - files.split_files(self.options.target_partitions), - statistics, - )) } /// Collects statistics for a given partitioned file. @@ -1182,6 +1176,92 @@ impl ListingTable { } } +/// Processes a stream of partitioned files and returns a `FileGroup` containing the files. +/// +/// This function collects files from the provided stream until either: +/// 1. The stream is exhausted +/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) +/// +/// # Arguments +/// * `files` - A stream of `Result` items to process +/// * `limit` - An optional row count limit. If provided, the function will stop collecting files +/// once the accumulated number of rows exceeds this limit +/// * `collect_stats` - Whether to collect and accumulate statistics from the files +/// +/// # Returns +/// A `Result` containing a `FileGroup` with the collected files +/// and a boolean indicating whether the statistics are inexact. +/// +/// # Note +/// The function will continue processing files if statistics are not available or if the +/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated +/// but files will still be collected. +async fn get_files_with_limit( + files: impl Stream>, + limit: Option, + collect_stats: bool, +) -> Result<(FileGroup, bool)> { + let mut file_group = FileGroup::default(); + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(files.fuse()); + let mut num_rows = Precision::::Absent; + while let Some(first_file) = all_files.next().await { + let file = first_file?; + if let Some(file_statistic) = &file.statistics { + num_rows = file_statistic.num_rows; + } + file_group.push(file); + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + let conservative_num_rows = match num_rows { + Precision::Exact(nr) => nr, + _ => usize::MIN, + }; + if conservative_num_rows <= limit.unwrap_or(usize::MAX) { + while let Some(current) = all_files.next().await { + let file = current?; + if !collect_stats { + file_group.push(file); + continue; + } + + // We accumulate the number of rows, total byte size and null + // counts across all the files in question. If any file does not + // provide any information or provides an inexact value, we demote + // the statistic precision to inexact. + if let Some(file_stats) = &file.statistics { + num_rows = crate::datasource::statistics::add_row_stats( + num_rows, + file_stats.num_rows, + ); + } + file_group.push(file); + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + if num_rows.get_value().unwrap_or(&usize::MIN) + > &limit.unwrap_or(usize::MAX) + { + break; + } + } + } + } + let mut inexact_stats = false; + if all_files.next().await.is_some() { + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + inexact_stats = true; + } + Ok((file_group, inexact_stats)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 5fe3e25eaa1f..75c4160f145e 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -418,6 +418,11 @@ impl FileGroup { self.files.push(file); } + /// Get the statistics for this group + pub fn statistics(&self) -> Option<&Statistics> { + self.statistics.as_ref() + } + /// Partition the list of files into `n` groups pub fn split_files(mut self, n: usize) -> Vec { if self.is_empty() { diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5172dafb1f91..729283289caf 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2000,7 +2000,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -2020,7 +2020,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, metadata_size_hint: None, } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1e7ea1255df7..954cd582bf9a 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -107,7 +107,7 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes @@ -187,6 +187,12 @@ impl PartitionedFile { self.extensions = Some(extensions); self } + + // Update the statistics for this file. + pub fn with_statistics(mut self, statistics: Arc) -> Self { + self.statistics = Some(statistics); + self + } } impl From for PartitionedFile { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d1141060f9e0..c949e3c9f8cb 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -565,7 +565,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(Arc::new)) + .transpose()?, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c196595eeed4..1384e6c0c32b 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -449,7 +449,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } } From 2ea8ca213cbbede3650ca95929f1c91b309d12aa Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 27 Mar 2025 22:15:17 +0800 Subject: [PATCH 2/4] avoid clone --- datafusion/core/src/datasource/statistics.rs | 356 +++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 datafusion/core/src/datasource/statistics.rs diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs new file mode 100644 index 000000000000..cf900afdb990 --- /dev/null +++ b/datafusion/core/src/datasource/statistics.rs @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::datatypes::SchemaRef; +use crate::error::Result; +use crate::physical_plan::{ColumnStatistics, Statistics}; +use datafusion_common::stats::Precision; +use datafusion_common::ScalarValue; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::PartitionedFile; +use futures::{Stream, StreamExt}; +use std::mem; +use std::sync::Arc; + +/// Get all files as well as the file level summary statistics (no statistic for partition columns). +/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to +/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on +/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive +/// call to `multiunzip` for constructing file level summary statistics. +#[deprecated( + since = "47.0.0", + note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead" +)] +#[allow(unused)] +pub async fn get_statistics_with_limit( + all_files: impl Stream)>>, + file_schema: SchemaRef, + limit: Option, + collect_stats: bool, +) -> Result<(FileGroup, Statistics)> { + let mut result_files = FileGroup::default(); + // These statistics can be calculated as long as at least one file provides + // useful information. If none of the files provides any information, then + // they will end up having `Precision::Absent` values. Throughout calculations, + // missing values will be imputed as: + // - zero for summations, and + // - neutral element for extreme points. + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(all_files.fuse()); + + if let Some(first_file) = all_files.next().await { + let (mut file, file_stats) = first_file?; + file.statistics = Some(Arc::clone(&file_stats)); + result_files.push(file); + + // First file, we set them directly from the file statistics. + num_rows = file_stats.num_rows; + total_byte_size = file_stats.total_byte_size; + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index].sum_value = file_column.sum_value; + } + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + let conservative_num_rows = match num_rows { + Precision::Exact(nr) => nr, + _ => usize::MIN, + }; + if conservative_num_rows <= limit.unwrap_or(usize::MAX) { + while let Some(current) = all_files.next().await { + let (mut file, file_stats) = current?; + file.statistics = Some(Arc::clone(&file_stats)); + result_files.push(file); + if !collect_stats { + continue; + } + + // We accumulate the number of rows, total byte size and null + // counts across all the files in question. If any file does not + // provide any information or provides an inexact value, we demote + // the statistic precision to inexact. + num_rows = add_row_stats(file_stats.num_rows, num_rows); + + total_byte_size = + add_row_stats(file_stats.total_byte_size, total_byte_size); + + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + sum_value: file_sum, + distinct_count: _, + } = file_col_stats; + + col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value); + col_stats.sum_value = file_sum.add(&col_stats.sum_value); + } + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + if num_rows.get_value().unwrap_or(&usize::MIN) + > &limit.unwrap_or(usize::MAX) + { + break; + } + } + } + }; + + let mut statistics = Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + }; + if all_files.next().await.is_some() { + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + statistics = statistics.to_inexact() + } + + Ok((result_files, statistics)) +} + +/// Generic function to compute statistics across multiple items that have statistics +fn compute_summary_statistics( + items: I, + file_schema: &SchemaRef, + stats_extractor: impl Fn(&T) -> Option<&Statistics>, +) -> Statistics +where + I: IntoIterator, +{ + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + for (idx, item) in items.into_iter().enumerate() { + if let Some(item_stats) = stats_extractor(&item) { + if idx == 0 { + // First item, set values directly + num_rows = item_stats.num_rows; + total_byte_size = item_stats.total_byte_size; + for (index, column_stats) in + item_stats.column_statistics.iter().enumerate() + { + col_stats_set[index].null_count = column_stats.null_count; + col_stats_set[index].max_value = column_stats.max_value.clone(); + col_stats_set[index].min_value = column_stats.min_value.clone(); + col_stats_set[index].sum_value = column_stats.sum_value.clone(); + } + continue; + } + + // Accumulate statistics for subsequent items + num_rows = add_row_stats(item_stats.num_rows, num_rows); + total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); + + for (item_col_stats, col_stats) in item_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + col_stats.null_count = + add_row_stats(item_col_stats.null_count, col_stats.null_count); + set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); + set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); + col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); + } + } + } + + Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + } +} + +/// Computes the summary statistics for a group of files(`FileGroup` level's statistics). +/// +/// This function combines statistics from all files in the file group to create +/// summary statistics. It handles the following aspects: +/// - Merges row counts and byte sizes across files +/// - Computes column-level statistics like min/max values +/// - Maintains appropriate precision information (exact, inexact, absent) +/// +/// # Parameters +/// * `file_group` - The group of files to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics (if false, returns original file group) +/// +/// # Returns +/// A new file group with summary statistics attached +pub fn compute_file_group_statistics( + file_group: FileGroup, + file_schema: SchemaRef, + collect_stats: bool, +) -> Result { + if !collect_stats { + return Ok(file_group); + } + + let statistics = + compute_summary_statistics(file_group.iter(), &file_schema, |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + + Ok(file_group.with_statistics(statistics)) +} + +/// Computes statistics for all files across multiple file groups. +/// +/// This function: +/// 1. Computes statistics for each individual file group +/// 2. Summary statistics across all file groups +/// 3. Optionally marks statistics as inexact +/// +/// # Parameters +/// * `file_groups` - Vector of file groups to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics +/// * `inexact_stats` - Whether to mark the resulting statistics as inexact +/// +/// # Returns +/// A tuple containing: +/// * The processed file groups with their individual statistics attached +/// * The summary statistics across all file groups, aka all files summary statistics +pub fn compute_all_files_statistics( + file_groups: Vec, + file_schema: SchemaRef, + collect_stats: bool, + inexact_stats: bool, +) -> Result<(Vec, Statistics)> { + let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); + + // First compute statistics for each file group + for file_group in file_groups { + file_groups_with_stats.push(compute_file_group_statistics( + file_group, + Arc::clone(&file_schema), + collect_stats, + )?); + } + + // Then summary statistics across all file groups + let mut statistics = + compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + file_group.statistics() + }); + + if inexact_stats { + statistics = statistics.to_inexact() + } + + Ok((file_groups_with_stats, statistics)) +} + +pub(crate) fn add_row_stats( + file_num_rows: Precision, + num_rows: Precision, +) -> Precision { + match (file_num_rows, &num_rows) { + (Precision::Absent, _) => num_rows.to_inexact(), + (lhs, Precision::Absent) => lhs.to_inexact(), + (lhs, rhs) => lhs.add(rhs), + } +} + +/// If the given value is numerically greater than the original maximum value, +/// return the new maximum value with appropriate exactness information. +fn set_max_if_greater( + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 < val2 => + { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); + } + _ => {} + } +} + +/// If the given value is numerically lesser than the original minimum value, +/// return the new minimum value with appropriate exactness information. +fn set_min_if_lesser( + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 > val2 => + { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); + } + _ => {} + } +} From e058b0628ed74f0900dcc11adbb0aa1487c42aa2 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Sun, 30 Mar 2025 16:32:50 +0800 Subject: [PATCH 3/4] add tests --- .../core/src/datasource/listing/table.rs | 11 +- datafusion/core/src/datasource/statistics.rs | 180 ++++++++++++++++++ 2 files changed, 184 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b8eb5eb1ff5a..1f455df3b9e4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1252,13 +1252,10 @@ async fn get_files_with_limit( } } } - let mut inexact_stats = false; - if all_files.next().await.is_some() { - // If we still have files in the stream, it means that the limit kicked - // in, and the statistic could have been different had we processed the - // files in a different order. - inexact_stats = true; - } + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + let inexact_stats = all_files.next().await.is_some(); Ok((file_group, inexact_stats)) } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index cf900afdb990..9069d18f489a 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -354,3 +354,183 @@ fn set_min_if_lesser( _ => {} } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use std::sync::Arc; + + #[test] + fn test_compute_summary_statistics_basic() { + // Create a schema with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + // Create items with statistics + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + }, + ], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + // Call compute_summary_statistics + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify the results + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 + + // Verify column statistics + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); // 500 + 600 + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); // 1000 + 1200 + } + + #[test] + fn test_compute_summary_statistics_mixed_precision() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Create items with different precision levels + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + assert!(matches!(col_stats.sum_value, Precision::Absent)); + } + + #[test] + fn test_compute_summary_statistics_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Empty collection + let items: Vec> = vec![]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify default values for empty collection + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } +} From e7754933d9dd7f6cae024bf208bf120e381088f9 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 1 Apr 2025 13:11:14 +0800 Subject: [PATCH 4/4] fix conflicts --- .../core/src/datasource/listing/table.rs | 8 +- datafusion/core/src/datasource/mod.rs | 1 - datafusion/core/src/datasource/statistics.rs | 536 ------------------ datafusion/datasource/src/mod.rs | 3 +- datafusion/datasource/src/statistics.rs | 326 ++++++++++- 5 files changed, 328 insertions(+), 546 deletions(-) delete mode 100644 datafusion/core/src/datasource/statistics.rs diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1f455df3b9e4..79db5ecf5229 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -51,10 +51,11 @@ use datafusion_physical_expr::{ create_physical_expr, LexOrdering, PhysicalSortRequirement, }; -use crate::datasource::statistics::compute_all_files_statistics; use async_trait::async_trait; use datafusion_catalog::Session; use datafusion_common::stats::Precision; +use datafusion_datasource::add_row_stats; +use datafusion_datasource::compute_all_files_statistics; use datafusion_datasource::file_groups::FileGroup; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; @@ -1233,10 +1234,7 @@ async fn get_files_with_limit( // provide any information or provides an inexact value, we demote // the statistic precision to inexact. if let Some(file_stats) = &file.statistics { - num_rows = crate::datasource::statistics::add_row_stats( - num_rows, - file_stats.num_rows, - ); + num_rows = add_row_stats(num_rows, file_stats.num_rows); } file_group.push(file); diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index a195c9a882dd..a15b2b6ffe13 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source; pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; -pub use datafusion_datasource::get_statistics_with_limit; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs deleted file mode 100644 index 9069d18f489a..000000000000 --- a/datafusion/core/src/datasource/statistics.rs +++ /dev/null @@ -1,536 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::datatypes::SchemaRef; -use crate::error::Result; -use crate::physical_plan::{ColumnStatistics, Statistics}; -use datafusion_common::stats::Precision; -use datafusion_common::ScalarValue; -use datafusion_datasource::file_groups::FileGroup; -use datafusion_datasource::PartitionedFile; -use futures::{Stream, StreamExt}; -use std::mem; -use std::sync::Arc; - -/// Get all files as well as the file level summary statistics (no statistic for partition columns). -/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to -/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on -/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive -/// call to `multiunzip` for constructing file level summary statistics. -#[deprecated( - since = "47.0.0", - note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead" -)] -#[allow(unused)] -pub async fn get_statistics_with_limit( - all_files: impl Stream)>>, - file_schema: SchemaRef, - limit: Option, - collect_stats: bool, -) -> Result<(FileGroup, Statistics)> { - let mut result_files = FileGroup::default(); - // These statistics can be calculated as long as at least one file provides - // useful information. If none of the files provides any information, then - // they will end up having `Precision::Absent` values. Throughout calculations, - // missing values will be imputed as: - // - zero for summations, and - // - neutral element for extreme points. - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; - - // Fusing the stream allows us to call next safely even once it is finished. - let mut all_files = Box::pin(all_files.fuse()); - - if let Some(first_file) = all_files.next().await { - let (mut file, file_stats) = first_file?; - file.statistics = Some(Arc::clone(&file_stats)); - result_files.push(file); - - // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in - file_stats.column_statistics.clone().into_iter().enumerate() - { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value; - } - - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - let conservative_num_rows = match num_rows { - Precision::Exact(nr) => nr, - _ => usize::MIN, - }; - if conservative_num_rows <= limit.unwrap_or(usize::MAX) { - while let Some(current) = all_files.next().await { - let (mut file, file_stats) = current?; - file.statistics = Some(Arc::clone(&file_stats)); - result_files.push(file); - if !collect_stats { - continue; - } - - // We accumulate the number of rows, total byte size and null - // counts across all the files in question. If any file does not - // provide any information or provides an inexact value, we demote - // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); - - total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); - - for (file_col_stats, col_stats) in file_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - let ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - sum_value: file_sum, - distinct_count: _, - } = file_col_stats; - - col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); - set_max_if_greater(file_max, &mut col_stats.max_value); - set_min_if_lesser(file_min, &mut col_stats.min_value); - col_stats.sum_value = file_sum.add(&col_stats.sum_value); - } - - // If the number of rows exceeds the limit, we can stop processing - // files. This only applies when we know the number of rows. It also - // currently ignores tables that have no statistics regarding the - // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) - > &limit.unwrap_or(usize::MAX) - { - break; - } - } - } - }; - - let mut statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - }; - if all_files.next().await.is_some() { - // If we still have files in the stream, it means that the limit kicked - // in, and the statistic could have been different had we processed the - // files in a different order. - statistics = statistics.to_inexact() - } - - Ok((result_files, statistics)) -} - -/// Generic function to compute statistics across multiple items that have statistics -fn compute_summary_statistics( - items: I, - file_schema: &SchemaRef, - stats_extractor: impl Fn(&T) -> Option<&Statistics>, -) -> Statistics -where - I: IntoIterator, -{ - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; - - for (idx, item) in items.into_iter().enumerate() { - if let Some(item_stats) = stats_extractor(&item) { - if idx == 0 { - // First item, set values directly - num_rows = item_stats.num_rows; - total_byte_size = item_stats.total_byte_size; - for (index, column_stats) in - item_stats.column_statistics.iter().enumerate() - { - col_stats_set[index].null_count = column_stats.null_count; - col_stats_set[index].max_value = column_stats.max_value.clone(); - col_stats_set[index].min_value = column_stats.min_value.clone(); - col_stats_set[index].sum_value = column_stats.sum_value.clone(); - } - continue; - } - - // Accumulate statistics for subsequent items - num_rows = add_row_stats(item_stats.num_rows, num_rows); - total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); - - for (item_col_stats, col_stats) in item_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - col_stats.null_count = - add_row_stats(item_col_stats.null_count, col_stats.null_count); - set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); - set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); - col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); - } - } - } - - Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - } -} - -/// Computes the summary statistics for a group of files(`FileGroup` level's statistics). -/// -/// This function combines statistics from all files in the file group to create -/// summary statistics. It handles the following aspects: -/// - Merges row counts and byte sizes across files -/// - Computes column-level statistics like min/max values -/// - Maintains appropriate precision information (exact, inexact, absent) -/// -/// # Parameters -/// * `file_group` - The group of files to process -/// * `file_schema` - Schema of the files -/// * `collect_stats` - Whether to collect statistics (if false, returns original file group) -/// -/// # Returns -/// A new file group with summary statistics attached -pub fn compute_file_group_statistics( - file_group: FileGroup, - file_schema: SchemaRef, - collect_stats: bool, -) -> Result { - if !collect_stats { - return Ok(file_group); - } - - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); - - Ok(file_group.with_statistics(statistics)) -} - -/// Computes statistics for all files across multiple file groups. -/// -/// This function: -/// 1. Computes statistics for each individual file group -/// 2. Summary statistics across all file groups -/// 3. Optionally marks statistics as inexact -/// -/// # Parameters -/// * `file_groups` - Vector of file groups to process -/// * `file_schema` - Schema of the files -/// * `collect_stats` - Whether to collect statistics -/// * `inexact_stats` - Whether to mark the resulting statistics as inexact -/// -/// # Returns -/// A tuple containing: -/// * The processed file groups with their individual statistics attached -/// * The summary statistics across all file groups, aka all files summary statistics -pub fn compute_all_files_statistics( - file_groups: Vec, - file_schema: SchemaRef, - collect_stats: bool, - inexact_stats: bool, -) -> Result<(Vec, Statistics)> { - let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); - - // First compute statistics for each file group - for file_group in file_groups { - file_groups_with_stats.push(compute_file_group_statistics( - file_group, - Arc::clone(&file_schema), - collect_stats, - )?); - } - - // Then summary statistics across all file groups - let mut statistics = - compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { - file_group.statistics() - }); - - if inexact_stats { - statistics = statistics.to_inexact() - } - - Ok((file_groups_with_stats, statistics)) -} - -pub(crate) fn add_row_stats( - file_num_rows: Precision, - num_rows: Precision, -) -> Precision { - match (file_num_rows, &num_rows) { - (Precision::Absent, _) => num_rows.to_inexact(), - (lhs, Precision::Absent) => lhs.to_inexact(), - (lhs, rhs) => lhs.add(rhs), - } -} - -/// If the given value is numerically greater than the original maximum value, -/// return the new maximum value with appropriate exactness information. -fn set_max_if_greater( - max_nominee: &Precision, - max_value: &mut Precision, -) { - match (&max_value, max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { - *max_value = max_nominee.clone(); - } - (Precision::Exact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Exact(val2)) - if val1 < val2 => - { - *max_value = max_nominee.clone().to_inexact(); - } - (Precision::Exact(_), Precision::Absent) => { - let exact_max = mem::take(max_value); - *max_value = exact_max.to_inexact(); - } - (Precision::Absent, Precision::Exact(_)) => { - *max_value = max_nominee.clone().to_inexact(); - } - (Precision::Absent, Precision::Inexact(_)) => { - *max_value = max_nominee.clone(); - } - _ => {} - } -} - -/// If the given value is numerically lesser than the original minimum value, -/// return the new minimum value with appropriate exactness information. -fn set_min_if_lesser( - min_nominee: &Precision, - min_value: &mut Precision, -) { - match (&min_value, min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { - *min_value = min_nominee.clone(); - } - (Precision::Exact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Inexact(val2)) - | (Precision::Inexact(val1), Precision::Exact(val2)) - if val1 > val2 => - { - *min_value = min_nominee.clone().to_inexact(); - } - (Precision::Exact(_), Precision::Absent) => { - let exact_min = mem::take(min_value); - *min_value = exact_min.to_inexact(); - } - (Precision::Absent, Precision::Exact(_)) => { - *min_value = min_nominee.clone().to_inexact(); - } - (Precision::Absent, Precision::Inexact(_)) => { - *min_value = min_nominee.clone(); - } - _ => {} - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::ScalarValue; - use std::sync::Arc; - - #[test] - fn test_compute_summary_statistics_basic() { - // Create a schema with two columns - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Int32, false), - ])); - - // Create items with statistics - let stats1 = Statistics { - num_rows: Precision::Exact(10), - total_byte_size: Precision::Exact(100), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(1), - max_value: Precision::Exact(ScalarValue::Int32(Some(100))), - min_value: Precision::Exact(ScalarValue::Int32(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Exact(2), - max_value: Precision::Exact(ScalarValue::Int32(Some(200))), - min_value: Precision::Exact(ScalarValue::Int32(Some(10))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), - distinct_count: Precision::Absent, - }, - ], - }; - - let stats2 = Statistics { - num_rows: Precision::Exact(15), - total_byte_size: Precision::Exact(150), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(2), - max_value: Precision::Exact(ScalarValue::Int32(Some(120))), - min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Exact(3), - max_value: Precision::Exact(ScalarValue::Int32(Some(180))), - min_value: Precision::Exact(ScalarValue::Int32(Some(5))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), - distinct_count: Precision::Absent, - }, - ], - }; - - let items = vec![Arc::new(stats1), Arc::new(stats2)]; - - // Call compute_summary_statistics - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - // Verify the results - assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 - assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 - - // Verify column statistics - let col1_stats = &summary_stats.column_statistics[0]; - assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 - assert_eq!( - col1_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(120))) - ); - assert_eq!( - col1_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(-10))) - ); - assert_eq!( - col1_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(1100))) - ); // 500 + 600 - - let col2_stats = &summary_stats.column_statistics[1]; - assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 - assert_eq!( - col2_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(200))) - ); - assert_eq!( - col2_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(5))) - ); - assert_eq!( - col2_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(2200))) - ); // 1000 + 1200 - } - - #[test] - fn test_compute_summary_statistics_mixed_precision() { - // Create a schema with one column - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - - // Create items with different precision levels - let stats1 = Statistics { - num_rows: Precision::Exact(10), - total_byte_size: Precision::Inexact(100), - column_statistics: vec![ColumnStatistics { - null_count: Precision::Exact(1), - max_value: Precision::Exact(ScalarValue::Int32(Some(100))), - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), - distinct_count: Precision::Absent, - }], - }; - - let stats2 = Statistics { - num_rows: Precision::Inexact(15), - total_byte_size: Precision::Exact(150), - column_statistics: vec![ColumnStatistics { - null_count: Precision::Inexact(2), - max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), - min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - }], - }; - - let items = vec![Arc::new(stats1), Arc::new(stats2)]; - - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); - assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); - - let col_stats = &summary_stats.column_statistics[0]; - assert_eq!(col_stats.null_count, Precision::Inexact(3)); - assert_eq!( - col_stats.max_value, - Precision::Inexact(ScalarValue::Int32(Some(120))) - ); - assert_eq!( - col_stats.min_value, - Precision::Inexact(ScalarValue::Int32(Some(-10))) - ); - assert!(matches!(col_stats.sum_value, Precision::Absent)); - } - - #[test] - fn test_compute_summary_statistics_empty() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - - // Empty collection - let items: Vec> = vec![]; - - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - // Verify default values for empty collection - assert_eq!(summary_stats.num_rows, Precision::Absent); - assert_eq!(summary_stats.total_byte_size, Precision::Absent); - assert_eq!(summary_stats.column_statistics.len(), 1); - assert_eq!( - summary_stats.column_statistics[0].null_count, - Precision::Absent - ); - } -} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 954cd582bf9a..e4461c0b90a4 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -60,7 +60,8 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use statistics::get_statistics_with_limit; +pub use statistics::add_row_stats; +pub use statistics::compute_all_files_statistics; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 801315568a0d..7e875513f03f 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -293,6 +293,11 @@ fn sort_columns_from_physical_sort_exprs( /// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. +#[deprecated( + since = "47.0.0", + note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead" +)] +#[allow(unused)] pub async fn get_statistics_with_limit( all_files: impl Stream)>>, file_schema: SchemaRef, @@ -316,7 +321,7 @@ pub async fn get_statistics_with_limit( if let Some(first_file) = all_files.next().await { let (mut file, file_stats) = first_file?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); // First file, we set them directly from the file statistics. @@ -342,7 +347,7 @@ pub async fn get_statistics_with_limit( if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { let (mut file, file_stats) = current?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); if !collect_stats { continue; @@ -404,7 +409,142 @@ pub async fn get_statistics_with_limit( Ok((result_files, statistics)) } -fn add_row_stats( +/// Generic function to compute statistics across multiple items that have statistics +fn compute_summary_statistics( + items: I, + file_schema: &SchemaRef, + stats_extractor: impl Fn(&T) -> Option<&Statistics>, +) -> Statistics +where + I: IntoIterator, +{ + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + for (idx, item) in items.into_iter().enumerate() { + if let Some(item_stats) = stats_extractor(&item) { + if idx == 0 { + // First item, set values directly + num_rows = item_stats.num_rows; + total_byte_size = item_stats.total_byte_size; + for (index, column_stats) in + item_stats.column_statistics.iter().enumerate() + { + col_stats_set[index].null_count = column_stats.null_count; + col_stats_set[index].max_value = column_stats.max_value.clone(); + col_stats_set[index].min_value = column_stats.min_value.clone(); + col_stats_set[index].sum_value = column_stats.sum_value.clone(); + } + continue; + } + + // Accumulate statistics for subsequent items + num_rows = add_row_stats(item_stats.num_rows, num_rows); + total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); + + for (item_col_stats, col_stats) in item_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + col_stats.null_count = + add_row_stats(item_col_stats.null_count, col_stats.null_count); + set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); + set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); + col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); + } + } + } + + Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + } +} + +/// Computes the summary statistics for a group of files(`FileGroup` level's statistics). +/// +/// This function combines statistics from all files in the file group to create +/// summary statistics. It handles the following aspects: +/// - Merges row counts and byte sizes across files +/// - Computes column-level statistics like min/max values +/// - Maintains appropriate precision information (exact, inexact, absent) +/// +/// # Parameters +/// * `file_group` - The group of files to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics (if false, returns original file group) +/// +/// # Returns +/// A new file group with summary statistics attached +pub fn compute_file_group_statistics( + file_group: FileGroup, + file_schema: SchemaRef, + collect_stats: bool, +) -> Result { + if !collect_stats { + return Ok(file_group); + } + + let statistics = + compute_summary_statistics(file_group.iter(), &file_schema, |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + + Ok(file_group.with_statistics(statistics)) +} + +/// Computes statistics for all files across multiple file groups. +/// +/// This function: +/// 1. Computes statistics for each individual file group +/// 2. Summary statistics across all file groups +/// 3. Optionally marks statistics as inexact +/// +/// # Parameters +/// * `file_groups` - Vector of file groups to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics +/// * `inexact_stats` - Whether to mark the resulting statistics as inexact +/// +/// # Returns +/// A tuple containing: +/// * The processed file groups with their individual statistics attached +/// * The summary statistics across all file groups, aka all files summary statistics +pub fn compute_all_files_statistics( + file_groups: Vec, + file_schema: SchemaRef, + collect_stats: bool, + inexact_stats: bool, +) -> Result<(Vec, Statistics)> { + let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); + + // First compute statistics for each file group + for file_group in file_groups { + file_groups_with_stats.push(compute_file_group_statistics( + file_group, + Arc::clone(&file_schema), + collect_stats, + )?); + } + + // Then summary statistics across all file groups + let mut statistics = + compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + file_group.statistics() + }); + + if inexact_stats { + statistics = statistics.to_inexact() + } + + Ok((file_groups_with_stats, statistics)) +} + +pub fn add_row_stats( file_num_rows: Precision, num_rows: Precision, ) -> Precision { @@ -476,3 +616,183 @@ fn set_min_if_lesser( _ => {} } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use std::sync::Arc; + + #[test] + fn test_compute_summary_statistics_basic() { + // Create a schema with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + // Create items with statistics + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + }, + ], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + // Call compute_summary_statistics + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify the results + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 + + // Verify column statistics + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); // 500 + 600 + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); // 1000 + 1200 + } + + #[test] + fn test_compute_summary_statistics_mixed_precision() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Create items with different precision levels + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + assert!(matches!(col_stats.sum_value, Precision::Absent)); + } + + #[test] + fn test_compute_summary_statistics_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Empty collection + let items: Vec> = vec![]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify default values for empty collection + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } +}