diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 2648f7289798..3abcb1d72455 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -30,6 +30,8 @@ pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; pub const DEFAULT_JSON_EXTENSION: &str = ".json"; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; +// An interal file extension for extensionless single files +pub const SINGLE_FILE_EXTENSION: &str = ".single"; /// Define each `FileType`/`FileCompressionType`'s extension pub trait GetExt { diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index cb8a6cf29541..3c0d84ff5cea 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -26,6 +26,7 @@ use super::{ }; use datafusion_common::config::TableParquetOptions; +use datafusion_common::file_options::file_type::SINGLE_FILE_EXTENSION; use datafusion_common::not_impl_err; use datafusion_expr::dml::InsertOp; @@ -84,9 +85,17 @@ impl DataFrame { .build()? }; + let path = if options.single_file_output { + let mut path = path.to_owned(); + path.push_str(SINGLE_FILE_EXTENSION); + path + } else { + path.to_owned() + }; + let plan = LogicalPlanBuilder::copy_to( plan, - path.into(), + path, file_type, Default::default(), options.partition_by, diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 731f7e59ecfa..a5d8b9ddbd0e 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -113,6 +113,7 @@ mod tests { }; use datafusion_execution::config::SessionConfig; + use datafusion_expr::Partitioning; use tempfile::{tempdir, TempDir}; #[tokio::test] @@ -216,6 +217,98 @@ mod tests { Ok(()) } + #[tokio::test] + async fn write_multiple_file_parquet_no_extensions() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("purchase_id", DataType::Int32, false), + Field::new("price", DataType::Float32, false), + Field::new("quantity", DataType::Int32, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])), + Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])), + ], + )?)?; + + let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?; + let tmp_dir = tempdir()?; + let path = tmp_dir + .path() + .join("no_ext_parquet") + .to_str() + .unwrap() + .to_string(); + + let options = DataFrameWriteOptions::new(); + + partitioned_df.write_parquet(&path, options, None).await?; + + let test_path = std::path::Path::new(&path); + assert!( + test_path.is_dir(), + "No extension and default DataFrameWriteOptons should have yielded a dir." + ); + + let mut count = 0; + if let Ok(dir) = test_path.read_dir() { + for result in dir { + if let Ok(entry) = result { + if entry.file_type()?.is_file() { + count += 1; + } + } + } + } + + // TODO: depending on file size this actually emits > 1 + assert_eq!( + count, 1, + "No extension and default DataFrameWriteOptons should have yielded a dir." + ); + + Ok(()) + } + + #[tokio::test] + async fn write_single_file_parquet_no_extensions() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("purchase_id", DataType::Int32, false), + Field::new("price", DataType::Float32, false), + Field::new("quantity", DataType::Int32, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])), + Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])), + ], + )?)?; + let partitioned_df = df.repartition(Partitioning::RoundRobinBatch(2))?; + let tmp_dir = tempdir()?; + let path = tmp_dir + .path() + .join("no_ext_parquet") + .to_str() + .unwrap() + .to_string(); + + let options = DataFrameWriteOptions::new().with_single_file_output(true); + + partitioned_df.write_parquet(&path, options, None).await?; + + let test_path = std::path::Path::new(&path); + assert!( + test_path.is_file(), + "No extension and DataFrameWriteOptons::with_single_file_output(true) should have yielded a single file." + ); + + Ok(()) + } + #[tokio::test] async fn read_from_different_file_extension() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 52cb17c10453..088839b02a99 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::url::ListingTableUrl; use crate::write::FileSinkConfig; use datafusion_common::error::Result; +use datafusion_common::file_options::file_type::SINGLE_FILE_EXTENSION; use datafusion_physical_plan::SendableRecordBatchStream; use arrow::array::{ @@ -106,8 +107,21 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + let dot_free = SINGLE_FILE_EXTENSION.replace(".", ""); + let single_file_output = (!base_output_path.is_collection() + && base_output_path.file_extension().is_some()) + || base_output_path.file_extension() == Some(&dot_free); + + let file_extension = if single_file_output { + file_extension.replace(SINGLE_FILE_EXTENSION, "") + } else { + file_extension + }; + + let base_output_path = ListingTableUrl::parse( + base_output_path.as_str().replace(SINGLE_FILE_EXTENSION, ""), + ) + .unwrap_or(base_output_path); SpawnedTask::spawn(async move { row_count_demuxer( tx,