Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,33 @@ type RecordBatchReceiver = Receiver<RecordBatch>;
type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;

/// Splits a single [SendableRecordBatchStream] into a dynamically determined
/// number of partitions at execution time. The partitions are determined by
/// factors known only at execution time, such as total number of rows and
/// partition column values. The demuxer task communicates to the caller
/// by sending channels over a channel. The inner channels send RecordBatches
/// which should be contained within the same output file. The outer channel
/// is used to send a dynamic number of inner channels, representing a dynamic
/// number of total output files. The caller is also responsible to monitor
/// the demux task for errors and abort accordingly. A path with an extension will
/// force only a single file to be written with the extension from the path. Otherwise
/// the default extension will be used and the output will be split into multiple files.
/// partition_by parameter will additionally split the input based on the unique
/// values of a specific column `<https://github.com/apache/datafusion/issues/7744>``
/// number of partitions at execution time.
///
/// The partitions are determined by factors known only at execution time, such
/// as total number of rows and partition column values. The demuxer task
/// communicates to the caller by sending channels over a channel. The inner
/// channels send RecordBatches which should be contained within the same output
/// file. The outer channel is used to send a dynamic number of inner channels,
/// representing a dynamic number of total output files.
///
/// The caller is also responsible to monitor the demux task for errors and
/// abort accordingly.
///
/// A path with an extension will force only a single file to
/// be written with the extension from the path. Otherwise the default extension
/// will be used and the output will be split into multiple files.
///
/// Examples of `base_output_path`
/// * `tmp/dataset/` -> is a folder since it ends in `/`
/// * `tmp/dataset` -> is still a folder since it does not end in `/` but has no valid file extension
/// * `tmp/file.parquet` -> is a file since it does not end in `/` and has a valid file extension `.parquet`
/// * `tmp/file.parquet/` -> is a folder since it ends in `/`
///
/// The `partition_by` parameter will additionally split the input based on the
/// unique values of a specific column, see
/// <https://github.com/apache/datafusion/issues/7744>
///
/// ```text
/// ┌───────────┐ ┌────────────┐ ┌─────────────┐
/// ┌──────▶ │ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│
/// │ └───────────┘ └────────────┘ └─────────────┘
Expand All @@ -75,6 +90,7 @@ type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
/// └──────────┘ │ ┌───────────┐ ┌────────────┐ ┌─────────────┐
/// └──────▶ │ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│
/// └───────────┘ └────────────┘ └─────────────┘
/// ```
pub(crate) fn start_demuxer_task(
input: SendableRecordBatchStream,
context: &Arc<TaskContext>,
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ impl ListingTableUrl {
}

/// Returns the file extension of the last path segment if it exists
///
/// Examples:
/// ```rust
/// use datafusion::datasource::listing::ListingTableUrl;
/// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
/// assert_eq!(url.file_extension(), Some("csv"));
/// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
/// assert_eq!(url.file_extension(), None);
/// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
/// assert_eq!(url.file_extension(), None);
/// ```
pub fn file_extension(&self) -> Option<&str> {
if let Some(segments) = self.url.path_segments() {
if let Some(last_segment) = segments.last() {
Expand Down