Skip to content

Use file statistics in query planning to avoid sorting when unecessary #7490

@suremarc

Description

@suremarc

Is your feature request related to a problem or challenge?

Related issue: #6672

DataFusion currently cannot avoid sorts when there are more files than there are target_partitions.

When querying data from an object store, DataFusion will attempt to group files into file groups internally, which are executed concurrently with one another. However, if any file group contains multiple files, the file_sort_order of the files cannot be maintained at present.

To illustrate, suppose we had a table trades comprised of the following files in S3:

❯ mc ls s3/trades/year=2023/month=01 -r
[2023-05-23 15:13:18 CDT] 1.2GiB STANDARD day=03/trades-2023-01-03.parquet
[2023-05-23 15:12:36 CDT] 1.2GiB STANDARD day=04/trades-2023-01-04.parquet
[2023-05-23 15:12:00 CDT] 1.1GiB STANDARD day=05/trades-2023-01-05.parquet
[2023-05-23 15:13:21 CDT] 1.2GiB STANDARD day=06/trades-2023-01-06.parquet
[2023-05-23 15:12:40 CDT] 1.2GiB STANDARD day=09/trades-2023-01-09.parquet

and their sort order was timestamp ASC. If our target_partitions is set to 3, the following query:

SELECT * 
FROM trades 
ORDER BY timestamp ASC
LIMIT 50000;

results in the following suboptimal plan:

GlobalLimitExec: skip=0, fetch=50000
  SortPreservingMergeExec: [timestamp@0 ASC NULLS LAST], fetch=50000
    SortExec: fetch=50000, expr=[timestamp@0 ASC NULLS LAST]
      ParquetExec: file_groups={3 groups: [[year=2023/month=01/day=05/trades-2023-01-05.parquet, year=2023/month=01/day=06/trades-2023-01-06.parquet], [year=2023/month=01/day=03/trades-2023-01-03.parquet, year=2023/month=01/day=09/trades-2023-01-09.parquet], [year=2023/month=01/day=04/trades-2023-01-04.parquet]]}, projection=[ticker, timestamp, participant_timestamp, trf_timestamp, sequence_number, conditions, id, price, size, correction, exchange, trf, tape, year, month, day]

DataFusion has decided that this plan needs a sort, because some file groups have multiple files. (This could be avoided if there was only 1 or even up to 3 files.) However, in this case we know that trades-2023-01-03.parquet could be streamed before trades-2023-01-04.parquet in timestamp order, because every timestamp in trades-2023-01-03.parquet precedes every timestamp in trades-2023-01-04.parquet. In fact, the physical plan shown above is ordered -- but DataFusion does not know this and currently has no way to know this.

Describe the solution you'd like

Essentially, DataFusion should be able to detect which files are non-overlapping, and use this to intelligently distribute files into file groups in such a way that still outputs data in order. Below I offer one possible path to doing so, which I believe should be minimally invasive.

At a minimum, PartitionedFile should have an additional optional field, statistics, which contains a Statistics object with the min/max statistics for that file. FileScanConfig::project should be changed to detect when files within a file group are distributed in order. Lastly, a physical optimizer to redistribute file groups to be ordered may be necessary to take advantage of this in some cases.

This does not solve the issue of how to feed file-level statistics into DataFusion, but users may add extensions to DataFusion that do so -- for example a custom TableProvider could do this. However, it should be feasible to integrate this feature into ListingTable. In fact, when collect_statistics is enabled, the ListingTable already fetches file-level statistics on each query, but discards them after rolling them up into one statistic per column.

Describe alternatives you've considered

At my company, we created a custom FileFormat implementation that outputs a wrapped ParquetExec with the output_ordering() method overrided, and the files redistributed to be in-order. However, instead of using statistics, it relies on hints from configuration provided by the user, plus this does not particularly seem in the spirit of what FileFormat is supposed to be. We would like to implement this optimization in a way that fits better with DataFusion and works out of the box without hints.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions