diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index b03676d53271..56ded495c8a8 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -52,18 +52,33 @@ type RecordBatchReceiver = Receiver; type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>; /// Splits a single [SendableRecordBatchStream] into a dynamically determined -/// number of partitions at execution time. The partitions are determined by -/// factors known only at execution time, such as total number of rows and -/// partition column values. The demuxer task communicates to the caller -/// by sending channels over a channel. The inner channels send RecordBatches -/// which should be contained within the same output file. The outer channel -/// is used to send a dynamic number of inner channels, representing a dynamic -/// number of total output files. The caller is also responsible to monitor -/// the demux task for errors and abort accordingly. A path with an extension will -/// force only a single file to be written with the extension from the path. Otherwise -/// the default extension will be used and the output will be split into multiple files. -/// partition_by parameter will additionally split the input based on the unique -/// values of a specific column ``` +/// number of partitions at execution time. +/// +/// The partitions are determined by factors known only at execution time, such +/// as total number of rows and partition column values. The demuxer task +/// communicates to the caller by sending channels over a channel. The inner +/// channels send RecordBatches which should be contained within the same output +/// file. The outer channel is used to send a dynamic number of inner channels, +/// representing a dynamic number of total output files. +/// +/// The caller is also responsible to monitor the demux task for errors and +/// abort accordingly. +/// +/// A path with an extension will force only a single file to +/// be written with the extension from the path. Otherwise the default extension +/// will be used and the output will be split into multiple files. +/// +/// Examples of `base_output_path` +/// * `tmp/dataset/` -> is a folder since it ends in `/` +/// * `tmp/dataset` -> is still a folder since it does not end in `/` but has no valid file extension +/// * `tmp/file.parquet` -> is a file since it does not end in `/` and has a valid file extension `.parquet` +/// * `tmp/file.parquet/` -> is a folder since it ends in `/` +/// +/// The `partition_by` parameter will additionally split the input based on the +/// unique values of a specific column, see +/// +/// +/// ```text /// ┌───────────┐ ┌────────────┐ ┌─────────────┐ /// ┌──────▶ │ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│ /// │ └───────────┘ └────────────┘ └─────────────┘ @@ -75,6 +90,7 @@ type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>; /// └──────────┘ │ ┌───────────┐ ┌────────────┐ ┌─────────────┐ /// └──────▶ │ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│ /// └───────────┘ └────────────┘ └─────────────┘ +/// ``` pub(crate) fn start_demuxer_task( input: SendableRecordBatchStream, context: &Arc, diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index e627cacfbfc7..2e8d314354c4 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -191,6 +191,17 @@ impl ListingTableUrl { } /// Returns the file extension of the last path segment if it exists + /// + /// Examples: + /// ```rust + /// use datafusion::datasource::listing::ListingTableUrl; + /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap(); + /// assert_eq!(url.file_extension(), Some("csv")); + /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap(); + /// assert_eq!(url.file_extension(), None); + /// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap(); + /// assert_eq!(url.file_extension(), None); + /// ``` pub fn file_extension(&self) -> Option<&str> { if let Some(segments) = self.url.path_segments() { if let Some(last_segment) = segments.last() {