Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ arrow-flight = { version = "55.1.0", features = [
] }
arrow-ipc = { version = "55.0.0", default-features = false, features = [
"lz4",
"zstd",
] }
arrow-ord = { version = "55.0.0", default-features = false }
arrow-schema = { version = "55.0.0", default-features = false }
Expand Down
67 changes: 67 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Runtime configuration, via [`ConfigOptions`]

use arrow_ipc::CompressionType;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
Expand Down Expand Up @@ -274,6 +276,61 @@ config_namespace! {
}
}

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum SpillCompression {
Zstd,
Lz4Frame,
#[default]
Uncompressed,
}
Comment on lines +279 to +285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the default codec after experiments.


impl FromStr for SpillCompression {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"zstd" => Ok(Self::Zstd),
"lz4_frame" => Ok(Self::Lz4Frame),
"uncompressed" | "" => Ok(Self::Uncompressed),
other => Err(DataFusionError::Configuration(format!(
"Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
))),
}
}
}

impl ConfigField for SpillCompression {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = SpillCompression::from_str(value)?;
Ok(())
}
}

impl Display for SpillCompression {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let str = match self {
Self::Zstd => "zstd",
Self::Lz4Frame => "lz4_frame",
Self::Uncompressed => "uncompressed",
};
write!(f, "{str}")
}
}

impl From<SpillCompression> for Option<CompressionType> {
fn from(c: SpillCompression) -> Self {
match c {
SpillCompression::Zstd => Some(CompressionType::ZSTD),
SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
SpillCompression::Uncompressed => None,
}
}
}

config_namespace! {
/// Options related to query execution
///
Expand Down Expand Up @@ -330,6 +387,16 @@ config_namespace! {
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false

/// Sets the compression codec used when spilling data to disk.
///
/// Since datafusion writes spill files using the Arrow IPC Stream format,
/// only codecs supported by the Arrow IPC Stream Writer are allowed.
/// Valid values are: uncompressed, lz4_frame, zstd.
/// Note: lz4_frame offers faster (de)compression, but typically results in
/// larger spill files. In contrast, zstd achieves
/// higher compression ratios at the cost of slower (de)compression speed.
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
Expand Down
82 changes: 79 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Int32Type, SchemaRef};
use arrow_schema::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
use datafusion::config::SpillCompression;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::datasource::{MemTable, TableProvider};
Expand Down Expand Up @@ -545,10 +546,11 @@ async fn test_external_sort_zero_merge_reservation() {
// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
// ------------------------------------------------------------------

// Create a new `SessionContext` with speicified disk limit and memory pool limit
// Create a new `SessionContext` with speicified disk limit, memory pool limit, and spill compression codec
async fn setup_context(
disk_limit: u64,
memory_pool_limit: usize,
spill_compression: SpillCompression,
) -> Result<SessionContext> {
let disk_manager = DiskManagerBuilder::default()
.with_mode(DiskManagerMode::OsTmpDirectory)
Expand All @@ -570,6 +572,7 @@ async fn setup_context(
let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(spill_compression)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);

Expand All @@ -580,7 +583,8 @@ async fn setup_context(
/// (specified by `max_temp_directory_size` in `DiskManager`)
#[tokio::test]
async fn test_disk_spill_limit_reached() -> Result<()> {
let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit
let spill_compression = SpillCompression::Uncompressed;
let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit

let df = ctx
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
Expand All @@ -602,7 +606,8 @@ async fn test_disk_spill_limit_reached() -> Result<()> {
#[tokio::test]
async fn test_disk_spill_limit_not_reached() -> Result<()> {
let disk_spill_limit = 1024 * 1024; // 1MB
let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit
let spill_compression = SpillCompression::Uncompressed;
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit

let df = ctx
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
Expand Down Expand Up @@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> {
Ok(())
}

/// External query should succeed using zstd as spill compression codec and
/// and all temporary spill files are properly cleaned up after execution.
/// Note: This test does not inspect file contents (e.g. magic number),
/// as spill files are automatically deleted on drop.
#[tokio::test]
async fn test_spill_file_compressed_with_zstd() -> Result<()> {
let disk_spill_limit = 1024 * 1024; // 1MB
let spill_compression = SpillCompression::Zstd;
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd

let df = ctx
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect_batches(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();

println!("spill count {spill_count}");
assert!(spill_count > 0);
assert!((spilled_bytes as u64) < disk_spill_limit);

// Verify that all temporary files have been properly cleaned up by checking
// that the total disk usage tracked by the disk manager is zero
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
assert_eq!(current_disk_usage, 0);

Ok(())
}

/// External query should succeed using lz4_frame as spill compression codec and
/// and all temporary spill files are properly cleaned up after execution.
/// Note: This test does not inspect file contents (e.g. magic number),
/// as spill files are automatically deleted on drop.
#[tokio::test]
async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {
Comment on lines +674 to +679
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added test here, but unfortunately it is not straightforward to check whether the file is actually compressed with desired codec in e2e test. Maybe we can compare spilled_bytes after follow up fix.

let disk_spill_limit = 1024 * 1024; // 1MB
let spill_compression = SpillCompression::Lz4Frame;
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame

let df = ctx
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect_batches(Arc::clone(&plan), task_ctx)
.await
.expect("Query execution failed");

let spill_count = plan.metrics().unwrap().spill_count().unwrap();
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();

println!("spill count {spill_count}");
assert!(spill_count > 0);
assert!((spilled_bytes as u64) < disk_spill_limit);

// Verify that all temporary files have been properly cleaned up by checking
// that the total disk usage tracked by the disk manager is zero
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
assert_eq!(current_disk_usage, 0);

Ok(())
}
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
15 changes: 14 additions & 1 deletion datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
};

use datafusion_common::{
config::{ConfigExtension, ConfigOptions},
config::{ConfigExtension, ConfigOptions, SpillCompression},
Result, ScalarValue,
};

Expand Down Expand Up @@ -258,6 +258,11 @@ impl SessionConfig {
self.options.execution.collect_statistics
}

/// Compression codec for spill file
pub fn spill_compression(&self) -> SpillCompression {
self.options.execution.spill_compression
}

/// Selects a name for the default catalog and schema
pub fn with_default_catalog_and_schema(
mut self,
Expand Down Expand Up @@ -421,6 +426,14 @@ impl SessionConfig {
self
}

/// Set the compression codec [`spill_compression`] used when spilling data to disk.
///
/// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
self.options.execution.spill_compression = spill_compression;
self
}

/// Set the size of [`sort_in_place_threshold_bytes`] to control
/// how sort does things.
///
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ impl GroupedHashAggregateStream {
context.runtime_env(),
metrics::SpillMetrics::new(&agg.metrics, partition),
Arc::clone(&partial_agg_schema),
);
)
.with_compression_type(context.session_config().spill_compression());

let spill_state = SpillState {
spills: vec![],
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use arrow::compute::{
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::ipc::reader::StreamReader;
use datafusion_common::config::SpillCompression;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
JoinType, NullEquality, Result,
Expand Down Expand Up @@ -500,6 +501,7 @@ impl ExecutionPlan for SortMergeJoinExec {

// create join stream
Ok(Box::pin(SortMergeJoinStream::try_new(
context.session_config().spill_compression(),
Arc::clone(&self.schema),
self.sort_options.clone(),
self.null_equality,
Expand Down Expand Up @@ -1324,6 +1326,8 @@ impl Stream for SortMergeJoinStream {
impl SortMergeJoinStream {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
// Configured via `datafusion.execution.spill_compression`.
spill_compression: SpillCompression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For readability, I suggest adding a comment indicating that this argument is passed through from the configuration xxx.
In the future, we might consider enforcing a naming convention for such arguments—e.g., always using a cfg_ prefix like cfg_spill_compression.

Comment on lines +1329 to +1330
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes. Thank you.

schema: SchemaRef,
sort_options: Vec<SortOptions>,
null_equality: NullEquality,
Expand All @@ -1344,7 +1348,8 @@ impl SortMergeJoinStream {
Arc::clone(&runtime_env),
join_metrics.spill_metrics.clone(),
Arc::clone(&buffered_schema),
);
)
.with_compression_type(spill_compression);
Ok(Self {
state: SortMergeJoinState::Init,
sort_options,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -258,6 +259,8 @@ impl ExternalSorter {
batch_size: usize,
sort_spill_reservation_bytes: usize,
sort_in_place_threshold_bytes: usize,
// Configured via `datafusion.execution.spill_compression`.
spill_compression: SpillCompression,
Comment on lines +262 to +263
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, too

metrics: &ExecutionPlanMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Result<Self> {
Expand All @@ -274,7 +277,8 @@ impl ExternalSorter {
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
Arc::clone(&schema),
);
)
.with_compression_type(spill_compression);

Ok(Self {
schema,
Expand Down Expand Up @@ -1183,6 +1187,7 @@ impl ExecutionPlan for SortExec {
context.session_config().batch_size(),
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
context.session_config().spill_compression(),
&self.metrics_set,
context.runtime_env(),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl InProgressSpillFile {
self.writer = Some(IPCStreamWriter::new(
in_progress_file.path(),
schema.as_ref(),
self.spill_writer.compression,
)?);

// Update metrics
Expand Down
Loading